简单理解 Python EventLoop 事件循环

2023年 10月 8日 92.7k 0

简介

在 python 3中,加入了 asyncio 模块,来实现协程,其中一个很重要的概念是事件循环,整个异步流程都是事件循环推动的。下面自己实现一个相对简单的EventLoop,了解一下事件循环是如何进行运转的。

事件循环

下面看一下整个流程的实现过程

将以下代码写入 spider_event_loop.py 文件:

# spider_event_loop.py

import time
import os
import socket
from urllib.parse import urlparse
from collections import deque
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

# selector = DefaultSelector()
urls = ['https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image',
        'https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image'
]


# 事件循环实现
class EventLoop:

    def __init__(self):
     	# 整个流程是否
        self._stopped = False
        self._ready = deque()
        self._selector = DefaultSelector()

    # 向 _ready 队列中加入回调方法
    def call_soon(self, callback, *args):
        # 将事件添加到队列里
        handle = Handle(callback, self, *args)
        self._ready.append(handle)

    # 套接字注册读事件
    def add_writer(self, fd, callback, *args):
        # 将回调方法封装成Handle
        handle = Handle(callback, self, *args)
        
        try:
            # 检查有没有注册过
            key = self._selector.get_key(fd)
        except KeyError:
            # 没有注册过,进行写事件注册
            self._selector.register(fd, EVENT_WRITE, handle)
        else:
            # 注册过,添加写事件
            mask = key.events
            self._selector.modify(fd, mask | EVENT_WRITE, handle)
    
    # 套接字注销读事件
    def remove_writer(self, fd):
        try:
            # 检查有没有注册过
            self._selector.get_key(fd)
        except KeyError:
            # 没有,直接返回
            return
        # 注销事件
        self._selector.unregister(fd)

    # 套接字注册写事件
    def add_reader(self, fd, callback, *args):
        # 将回调方法,封装成Handle
        handle = Handle(callback, self, *args)
        try:
            # 检查是否注册过
            key = self._selector.get_key(fd)
        except KeyError:
            # 没有注册过,进行读事件注册
            self._selector.register(fd, EVENT_READ, handle)
        else:
            # 注册过,添加读事件
            mask = key.events
            self._selector.modify(fd, mask | EVENT_READ, handle)

    # 套接字注销写事件
    def remove_reader(self, fd):
        try:
            # 检查有没有注册过
            self._selector.get_key(fd)
        except KeyError:
            # 没有,直接返回
            return
        # 注销事件
        self._selector.unregister(fd)

    # 处理事件
    def _process_events(self, event_list):
        for key, mask in event_list:
            fileobj, handle = key.fileobj, key.data
            if mask & EVENT_READ:
                self.remove_reader(fileobj)
            if mask & EVENT_WRITE:
                self.remove_writer(fileobj)
            # 注册的事件发生,将注册时的handle放入队列中
            self._ready.append(handle)

    # 运行事件队列,执行回调方法
    def run_once(self):
        # 获取发生事件的所有文件描述符
        event_list = self._selector.select()
        self._process_events(event_list)

        while self._ready:
            # 将队列中的handle取出,执行回调方法
            handle = self._ready.popleft()
            handle.run()

    # 无限循环,直到stopped
    def run_forever(self):
        while True:
            # 执行事件
            self.run_once()
            if self._stopped:
                break
    
    # 执行协程方法,直到事件循环执行结束
    def run_until_complete(self, future):
        # 在 future 中添加回调方法,在 future set_value 的时候执行这个回调方法
        future.add_done_callback(self._run_until_complete_cb)
        # 开始无限循环
        self.run_forever()
        # 返回 future 的值
        return future.value

    # 结束时的回调方法
    def _run_until_complete_cb(self, future):
        # 结束无限循环
        self.close()
    
    # 关闭方法
    def close(self):
        self._stopped = True

    # 创建 Future 实例
    def create_future(self):
        return Future(loop=self)

    # 创建 Task 实例
    def create_task(self, coro):
        return Task(coro, loop=self)

以上类是一个简单是事件循环,事件循环基本的操作都已经包含,注册注销事件,无限循环获取文件描述符的事件,执行Handle队列,这里是使用的 _ready 队列,表示已经准备好的 Handle,由于没有延时事件所以省略了调度队列

# Handle 类,对回调方法做封装
class Handle:

    def __init__(self, callback, loop, *args):
        self._callback = callback
        self._args = args
    
    # 执行回调方法
    def run(self):
        self._callback(*self._args)

Handle 类对回调方法进行封装

# Future 类
class Future:
    def __init__(self, loop=None):
        self.value = None
        # 将要执行的回调方法
        self._step_func = []
        # 和事件循环关联
        if loop:
            self._loop = loop
        else:
            self._loop = get_event_loop()

    def add_done_callback(self, func):
        # 添加回调方法
        self._step_func.append(func)

    def set_value(self, value):
        # 设置值
        self.value = value
        for func in self._step_func:
            # 将回调方法添加到事件循环的 _ready 队列中
            # 在下次事件循环中执行回调方法
            self._loop.call_soon(func, self)

    # 实现 __iter__ 方法,Future 类的实例为可迭代对象
    def __iter__(self):
        # 该语句起到暂停协程的作用,并返回实例本身
        yield self
        # 该语句定义的返回值会赋给 yield from 语句等号前面的变量
        return self.value

Future类和以前的实现,区分不是很大,只是和 loop 做关联,将回调方法放入事件循环的队列中,依靠事件循环执行回调方法

# Task 类,继承 Future 类, 也是可迭代的
class Task(Future):
    def __init__(self, coro, loop=None):
        super().__init__(loop=loop)
        self.coro = coro
        # step 方法直接放入事件循环的队列中进行执行
        # 激活协程方法运行
        self._loop.call_soon(self.step, self)

    def step(self, future):
        try:
            # 向协程发送数据,驱动协程执行
            # 直到遇到 yield ,返回新的 Future实例
            new_futrue = self.coro.send(future.value)
        except StopIteration as exc:
            # 在协程方法执行到结束,或者 return 之后,抛出 StopIteration 异常
            # 由于 Task 也是 Future, 在协程执行完之后,最后 Task 执行自己的回调方法
            self.set_value(exc.value)
            return
        # 将 step 加入回调列表,等待下次驱动执行
        # 在 Future 执行 set_value 时,又会执行 step 方法,再次驱动协程执行
        new_futrue.add_done_callback(self.step)

Task 类继承 Future 类,类的实例也是可迭代对象,在 step 方法会不断触发协程继续执行,在协程执行结束之后,抛出 StopIteration 异常,最后 Task 类再执行回调方法,主要做一些清理工作或者收集结果。

class AsyncSocket:
    def __init__(self, loop=None):
        # 绑定事件循环
        if loop:
            self._loop = loop
        else:
            self._loop = get_event_loop()
        self.sock = socket.socket()
        self.sock.setblocking(False)

    # 该方法用于向服务器发送连接请求并注册监听套接字的可写事件
    def connect(self, address):
        # 由 loop 创建 Future 实例
        f = self._loop.create_future()
        try:
            self.sock.connect(address)
        except BlockingIOError:
            pass
        # 注册写事件,在连接成功事件发生之后,调用回调方法
        self._loop.add_writer(self.sock.fileno(), self._connect_cb, f)
        # 暂停执行,等待写事件发生
        yield from f

    # 回调方法,连接事件发生
    def _connect_cb(self, future):
        # 设置 Future 值,并且驱动协程继续执行
        future.set_value(None)

    # 向服务器发送获取图片的请求
    def send(self, data):
        self.sock.send(data)

    # 该方法会多次执行,以获取服务器返回的数据片段
    def read(self):
        f = self._loop.create_future()
        # 注册读事件,在可读事件发生之后,调用回调方法
        self._loop.add_reader(self.sock.fileno(), self._read_cb, f, self.sock)

        # 暂停执行,等待读事件发生
        yield from f
        # 返回最后的值
        return f.value

    # 回调方法,读事件发生
    def _read_cb(self, future, sock):
        # 套接字读取 4096 字节的数据,设置 Future 值,并且驱动协程继续执行
        future.set_value(sock.recv(4096))
        
    # 读取所有数据
    def read_all(self):
        data = b''
        while True:
            # 不断读取 sock 的数据
            value = yield from self.read()
            if value:
                data += value
            else:
                return data

    # 关闭客户端套接字
    def close(self):
        self.sock.close()

AsyncSocket 类是对 Socket 做的简单封装,绑定事件循环 Loop ,读写事件都是在 Loop 中注册,由事件循环来驱动 Socket 的读写。

# 爬虫类
class Crawler:
    def __init__(self, url):
        self._url = url
        self.url = urlparse(url)
        self.response = b''

    def fetch(self):
        self.time = time.time()
        # AsyncSocket 类的实例对象负责完成数据获取的工作
        sock = AsyncSocket()
        # 向服务器发送连接请求,协程会暂停到嵌套协程中的某个 yield from 处
        yield from sock.connect((self.url.netloc, 80))
        data = 'GET {0} HTTP/1.1\r\nHost: {1}\r\nConnection: close\r\n\r\n \
                '.format(self.url.path, self.url.netloc)
        # 发送请求数据
        sock.send(data.encode())
        
        # 读取全部的数据
        self.response = yield from sock.read_all()
        # 关闭 socket
        sock.close()
        # 将下载的图片写入文件
        with open('pic/{}'.format(self.url.path[1:].split('/')[-1]), 'wb') as f:
            f.write(self.response.split(b'\r\n\r\n')[1])
        return "URL: {0}, 耗时: {1:.3f}s".format(self._url, time.time() - self.time)

Crawler 完成图片爬取工作,在 fetch 方法中,AsyncSocket 完成请求连接,发送数据,接收数据的工作,整个流程非常的清晰,和同步阻塞模式流程基本相同,但是性能会有大量提升

# 事件循环,全局变量
_event_loop = None

# 获取事件循环,这里是要获取同一个全局实例
def get_event_loop():
    global _event_loop
    if _event_loop is None:
        # 生成一个新的事件循环实例
        _event_loop = EventLoop()
    return _event_loop

# 收集所有的 task
def gather(tasks, loop=None):
    # 使用 Future 类 收集 所有 tasks 的结果
    outer = Future(loop=loop)
    # tasks 数量
    count = len(tasks)
    nfinished = 0
    # 收集结果
    results = []

    # 回调方法
    def _gather_cb(f):
        nonlocal nfinished
        # 完成数量
        nfinished += 1
        # 收集结果
        results.append(f.value)
        if nfinished == count:
            # 都完成之后,outer 设置值,同时执行回调方法
            outer.set_value(results)

    for task in tasks:
        # 所有的 task 都添加回调方法
        # task 中协程方法执行完成时, 在 step 方法中会抛出 StopIteration 异常
        # 这时候 task 会执行 set_value 方法,同时会执行 _gather_cb 回调方法
        task.add_done_callback(_gather_cb)
     
    # 将 Future 实例返回
    return outer

以上获取事件循环单例,所有的事件都是在一个循环中执行。gather 方法使多个 task 并发执行,并且收集所有 task 的结果

def main():
    os.system('mkdir -p pic')
    start = time.time()
    loop = get_event_loop()
    tasks = []
    for url in urls:
        # 爬虫实例
        crawler = Crawler(url)
        # 将 fetch 方法封装成 task 实例
        tasks.append(Task(crawler.fetch()))        
     
    # gather 方法将收集所有的 task 结果,并且返回 Futrue 实例,
    # Futrue 实例在 run_until_complete 方法中添加了 _run_until_complete_cb 回调方法
    # 在所有的 task 执行结束之后,Future 实例执行 set_value 方法,同时执行回调方法 _run_until_complete_cb
    # 在 _run_until_complete_cb 方法中执行了 close 方法,无限循环结束,整个流程结束
    # 返回 results 的值
    results = loop.run_until_complete(gather(tasks))
    for res in results:
        print(res)

if __name__ == '__main__':
    main()

执行 spider_event_loop.py 文件

输出:

$ python3 spider_event_loop.py
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.055s
URL: https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.101s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.157s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.158s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.162s
URL: https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.164s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.163s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.164s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.224s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.492s
总共耗时: 0.495s

使用事件循环,异步执行爬取数据,消耗时间很少,性能很高。

现在我们把整个流程梳理一下:

  • 从main方法开始执行
  • 获取 EventLoop 实例,loop 实例是全局变量
  • 将 urls 中的 url 分别进行处理,生成 Crawler 实例
  • Crawler 的 fetch 方法是协程,不会立即执行,Task 包装协程方法
  • 在 Task 的 __init__ 方法中, 会将 Task 的 step 方法,加入到 loop 的 队列中
  • step 方法会被封装成 Handle 方法,加入到 loop 的 _ready 的队列中
  • 在所有的 task 处理完成之后,加入到 tasks 列表中
  • 接下来执行 gather 方法,outer 是 Future 实例, 所有的 task 实例都添加 _gather_cb 回调方法
  • 在 _gather_cb 方法中,会收集 task 的结果
  • 接下来执行 loop 的 run_until_complete 方法,入参是 gather 方法返回的 outer 的 Future 实例
  • run_until_complete 方法中,outer 实例加入 _run_until_complete_cb 回调方法
  • 开始执行 run_forever 方法,进入循环
  • 执行 run_once 方法,获取所有的发生时间的文件描述符,由于现在还没有注册事件,所以事件列表为空
  • 将 _ready 队列中所有的 handle 取出,开始执行,由于 task 的 __init__ 方法中将 step 放入了队列中,所以这里会执行 task 的 step 方法
  • task 中的 coro 是 fetch 协程,所以在 step 方法中 coro 会执行 send 方法,由于 future 就是 task 实例,value 还是 None,所以会激活协程方法执行
  • fetch 方法开始真正执行,创建 AsyncSocket 实例 sock
  • 执行到 sock.connect((self.url.netloc, 80)) 会在 connect 方法中 创建新的 future 实例
  • 进行 sock 请求连接,再将网络套接字的文件描述符注册写事件,在 add_writer 方法中 _connect_cb 封装成 handle 实例,在 事件发生之后会执行 handle 的 run 方法
  • 暂停执行,返回 future 实例
  • 由于是在 task 的 step 方法中激活的协程执行,所以 new_future 就是返回的 future 实例
  • new_future 将 step 加入到回调列表中
  • 现在流程都暂停了,等待事件的发生
  • 在事件循环中,注册的文件描述符的写事件发生之后,_process_events 方法循环处理所有的事件,取出事件注册时候的 handle 方法,加入到 _ready 执行队列里面
  • 再从 _ready 的队列里面取出 handle 方法,此 handle 方法是注册时候封装的 _connect_cb 回调方法
  • 执行 _connect_cb 方法,参数是 future,也是返回的 new_future,这时候再执行 future 的 set_value 方法
  • 设置 future 的值,并且将回调队列里的回调方法取出,加入到事件循环的 _ready 队列里,由于 new_future 将 step 方法加入了回调队列,所以会再次执行 task 的 step 方法
  • step 又继续驱动协程执行,fetch 又开始了继续执行
  • sock 发送数据,然后注册读事件,由于读取数据时一次不会全部读完,会多次注册读事件,读取全部的数据
  • 读事件和写事件是相同的,在注册之后,就返回 future 对象,添加 step 回调方法,暂停执行。等待事件的发生
  • 在所有的数据读取之后,fetch 方法会执行结束,return 当前的数据
  • 由于 task 实例的 step 驱动的 fetch 方法,所以 step 方法会抛出 StopIteration 异常
  • task 继承的 Future,所以可以执行 set_value 方法,异常的值就是 fetch 返回的值
  • 在 gather 方法中,task 添加了 _gather_cb 回调方法,所以在 set_value 时,会调用回调方法
  • 将 task 的值收集到 results 列表中,等所有的 task 都执行结束之后,outer 实例开始执行 set_value
  • outer 添加了 _run_until_complete_cb 回调方法,所以这里同样会执行回调方法,在 _run_until_complete_cb 方法中调用事件循环的 close 方法,_stopped 设置为 True
  • run_forever 退出无限循环,整个流程执行结束,将 outer 的值返回

整个流程梳理完了,Task 不断驱动协程执行,EventLoop 监听事件循环,又不断驱动 Task 执行,Future 在协程的通道中传输数据,几个部分配合合作完成整个流程。

相关小册

Python 异步网络编程实战

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论