16.3. 协程与微线程

协程(coroutine),可以理解为是线程的优化,有的地方有称之为轻量级进程。它是一种比线程更节省资源、效率更高的系统调度机制。

它的特点是,在同时开启的多个任务中, 一次只执行一个。如果当前任务遭遇阻塞,才会切换到下一个任务继续执行。这种机制可以实现多任务的同步。 又能成功的避免线程中使用锁的复杂性,简化了开发。

早先的协程是使用生成器关键字yield 来实现的,代码特别复杂难懂。自从Python3.5之后,确定了协程的语法,使得创建协程的方式得到改善。
在Python中,能够实现协程的模块有多个,如asyncio、tornado 或gevento

1.协程的相关概念
这里以asyncio为例,先来了解一下创建协程所用到的概念。

event_loop(事件循环):是一个协程处理函数的调用机制。程序会开启一个无限循环,当事件发生时, 调用相应的协程函数。

coroutine(协程对象):指一个使用async关键字来定义的函数。调用该函数,会返回一个协程对象。该协程对象就是一个处于挂起状态的协程函数,需要注册到事件循环
event_loop中,由事件循环event_loop 进行调用。

task 任务: 是对协程的进一步封装。

future: 等同于tasko 代表执行任务的结果。

async/await关键字:Python3.5中有两个用于定义协程的关键字。async用于定义一个协程,await用于挂起阻塞的异步调用接口。

16.3.1. 协程的实现步骤

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 20:16
# filename: 01.协程的实现步骤.py

import asyncio  # 引入asyncio模块


async def do_some_work(x):  # 引入协程处理函数
    print(x)


coroutine = do_some_work("hello")  # 生成协程对象
loop = asyncio.get_event_loop()  # 获得事件循环对象
try:
    loop.run_until_complete(coroutine)  # 将协程注册到实现事件循环事件中,并开始运行输出hello
finally:
    loop.close()  # 程序结束关闭事件循环对象

'''
hello
'''

16.3.2. 使用协程实现任务提交和结果接收

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 20:22
# filename: 02.使用协程实现任务提交和结果接收.py
import asyncio


async def do_some_work(x):  # 定义协程处理函数
    print("任务:", x)
    return "任务:{}的返回结果".format(x)


def callback(futrue):  # 回调函数
    print("Callback: ", futrue.result())  # 返回任务结果


# 定义协程,并传入任务
coroutine = do_some_work("爬取当天股票")
loop = asyncio.get_event_loop()  # 获取事件循环对象
task = asyncio.ensure_future(coroutine)  # 获得任务对象(对协程进行封装)
task.add_done_callback(callback)  # 封装好的协程对象(任务)就可以绑定回调函数了
loop.run_until_complete(task)

# 第一行 接到处理任务, 第二行 返回收到处理任务后的结果
'''
任务:爬取当天股票
Callback: 任务:爬取当天股票的返回结果

'''

16.3.3. 使用协程批量修改文件扩展名

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/21 20:22
# filename: 03.使用协程批量修改文件扩展名.py
import asyncio
import os

path = r"D:\Users"


async def change_files(x):
    files = os.listdir(path)  # 列出当前目录下的所有文件
    for filename in files:
        postion = os.path.splitext(filename)  # 分离文件名和后缀
        print(postion)
        if postion[1] == ".txt":
            newname = postion[0] + ".sh"
            os.chdir(path)
            os.rename(filename, newname)
    return "{}任务完成".format(x)


def callback(futrue):  # 回调函数
    print("Callback: ", futrue.result())  # 返回任务结果


# 定义协程,并传入任务
coroutine = change_files("修改文件扩展名")
loop = asyncio.get_event_loop()  # 获取事件循环对象
task = asyncio.ensure_future(coroutine)  # 获得任务对象(对协程进行封装)
task.add_done_callback(callback)  # 封装好的协程对象(任务)就可以绑定回调函数了
loop.run_until_complete(task)

'''
('18793', '')
('Ansible的模块', '')
('apache-maven-3.6', '.2')
('apache-maven-3.6.2-bin', '.zip')
('lib', '')
('mysql', '.txt')
('pandoc-2.2.3', '.2')
('pandoc-2.2.3.2-windows-x86_64', '.zip')
Callback:  修改文件扩展名任务完成
'''

16.3.4. gevent库为Python提供了比较完善的协程支持

gevent是一个基于协程的python网络库,在遇到IO阻塞时,程序会自动进行切换,可以让我们用同步的方式写异步IO代码。

Python通过yield提供了对协程的基本支持,但是不完全,而使用第三方gevent库是更好的选择, gevent提供了比较完善的协程支持。 gevent是一个基于协程的Python网络函数库,使用greenlet在libev事件循环顶部提供了一个有高级别并发性的API。

主要特性有以下几点:

·基于libev的快速事件循环,Linux上是epoll机制。
·基于greenlet的轻量级执行单元。
·API复用了Python标准库里的内容。
·支持SSL的协作式sockets。
·可通过线程池或c-ares实现DNS查询。
·通过monkey patching功能使得第三方模块变成协作式。

gevent对协程的支持,本质上是greenlet在实现切换工作。 greenlet工作流程如下:

假如进行访问网络的IO操作时,出现阻塞,greenlet就显式切换到另一段没有被阻塞的代码段执行,直到原先的阻塞状况消失以后,再自动切换回原来的代码段继续处理。因此,greenlet是一种合理安排的串行方式。

由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO,这就是协程一般比多线程效率高的原因。由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,将一些常见的阻塞,如socket、select等地方实现协程跳转,这一过程在启动时通过monkey patch完成。

代码示例

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/2/26 11:32
# filename: sample01.py
from gevent import monkey

monkey.patch_all()
import gevent
import urllib.request


def run_task(url):
    print("Visit  ---> %s" % url)
    try:
        response = urllib.request.urlopen(url)
        data = response.read()
        print("%d bytes received from %s " % (len(data), url))
    except Exception as e:
        print(e)


if __name__ == '__main__':
    urls = ["https://github.com/", "https://www.python.org/", "http://www.cnblogs.com/"]
    greenlets = [gevent.spawn(run_task, url) for url in urls]
    gevent.joinall(greenlets)

"""
Visit  ---> https://github.com/
Visit  ---> https://www.python.org/
Visit  ---> http://www.cnblogs.com/
49178 bytes received from http://www.cnblogs.com/
135554 bytes received from https://github.com/


以上程序主要用了gevent中的spawn方法和joinall方法。
spawn方法可以看做是用来形成协程,
joinall方法就是添加这些协程任务,并且启动运行。
从运行结果来看,3个网络操作是并发执行的,而且结束顺序不同,但其实只有一个线程。

"""

gevent中还提供了对池的支持。当拥有动态数量的greenlet需要进行并发管理(限制并发数)时,就可以使用池, 这在处理大量的网络和IO操作时是非常需要的。 接下来使用gevent中pool对象,对上面的例子进行改写,程序如下:

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/2/26 11:32
# filename: sample01.py
from gevent import monkey

monkey.patch_all()
from gevent.pool import Pool
import urllib.request


def run_task(url):
    print("Visit  ---> %s" % url)
    try:
        response = urllib.request.urlopen(url)
        data = response.read()
        print("%d bytes received from %s " % (len(data), url))
    except Exception as e:
        print(e)
    return 'url:%s --->finish' % url


if __name__ == '__main__':
    pool = Pool(2)
    urls = ["https://github.com/", "https://www.python.org/", "http://www.cnblogs.com/"]
    results = pool.map(run_task, urls)
    print(results)

"""
Visit  ---> https://github.com/
Visit  ---> https://www.python.org/
135552 bytes received from https://github.com/
Visit  ---> http://www.cnblogs.com/
48710 bytes received from http://www.cnblogs.com/
48896 bytes received from https://www.python.org/
['url:https://github.com/ --->finish', 'url:https://www.python.org/ --->finish', 'url:http://www.cnblogs.com/ --->finish']

通过运行结果可以看出,Pool对象确实对协程的并发数量进行了管理,先访问了前两个网址,当其中一个任务完成时,才会执行第三个。

"""