一、引言
Python异步开发已经非常流行了,一些主流的组件像MySQL、Redis、RabbitMQ等都提供了异步的客户端,再处理耗时的时候不会堵塞住主线程,不但可以提高并发能力,也能减少多线程带来的内存资源消耗。但在业务开发的时候一些第三方库没有异步的处理方式,例如OSS、CV、其他第三方提供的SDK以及自己封装的函数有耗时等,此时还是需要借助线程来加速,再异步中就不会堵塞主线程,因此封装一个异步装饰器可以更好的处理异步,让代码更简洁。
常用组件异步库
序号 | 组件名 | 异步库 | 说明 |
---|---|---|---|
1 | MySQL | aiomysql github.com/aio-libs/ai… | 基于asyncio的MySQL驱动,用asyncio实现异步IO。这是最常用的Python异步MySQL驱动。 |
2 | Redis | aioredis github.com/aio-libs-ab… | aioredis因为性能好、使用广泛,是Python最主流的Redis异步驱动之一。 之前同步库的redis.py 后面再4.2.0rc1+版本也支持aioredis |
3 | RabbitMQ | aio-pika github.com/mosquito/ai… celery github.com/celery/cele… | 基于asyncio的RabbitMQ异步客户端,是最常用的Python异步RabbitMQ客户端之一。 celery是分布式任务队列框架,也支持常用消息中间件的异步封装 |
4 | Kafka | aio-kafka github.com/aio-libs/ai… | kafka常用的异步客户端 |
5 | http客户端 | httpx github.com/encode/http… aiohttp github.com/aio-libs/ai… | httpx一个现代的异步HTTP客户端,支持asyncio和操作语法与 Requests 库的API一致。 aiohttp也是十分常用的异步客户端,性能也不错 |
6 | 文件处理 | aiofiles github.com/Tinche/aiof… | 基于asyncio的异步文件操作库,提供类文件对象接口 |
这里就简单列举一些常用的异步库,可以发现好多异步库都在 github 的 aio-libs 中慢慢孵化,质量都挺不错的。大家有什么好用的异步库欢迎评论区推荐。
github.com/aio-libs
二、功能分析
支持同步函数使用线程加速
异、同步函数需支持 await 语法等待返回结果
异、同步函数需支持后台任务,无需等待
同步函数使用线程加速
同步函数使用线程,这还是挺简单的使用,内置库的 threading.Thread 就可以实现
import time
import threading
def task1(name):
print(f"Hello {name}")
time.sleep(1)
print(f"Completed {name}")
t1 = threading.Thread(target=task1, args=("hui",))
t2 = threading.Thread(target=task1, args=("wang",))
t1.start()
t2.start()
t1.join()
t2.join()
>>> out
Hello hui
Hello wang
Completed hui
Completed wang
- start()方法用于启动线程执行函数。
- join()方法用于等待线程执行结束。
但这样直接开线程的方式比较暴力,也不太好管理,因此可以想到线程池,进行线程复用与管理。Python内置的 concurrent.futures 模块提供了线程池和进程池的实现与封装。
import time
from concurrent.futures import ThreadPoolExecutor
def task2(name):
print(f"Hello {name}")
time.sleep(1)
return f"Completed {name}"
with ThreadPoolExecutor(max_workers=2) as executor:
future1 = executor.submit(task2, "hui")
future2 = executor.submit(task2, "zack")
print("ret1", future1.result())
print("ret2", future2.result())
>>> out
Hello hui
Hello zack
ret1 Completed hui
ret2 Completed zack
异、同步函数需支持 await 语法
异、同步函数需支持 await 语法等待返回结果,异步函数本身就支持 await语法,这里主要是实现同步函数支持
await 语法,在python中可以await语法的对象有如下几大类:
综上,凡是实现了__await__魔术方法的对象或者封装了协程/任务的对象,都可以被await,await会自动把对象交给事件循环运行,等待其完成。
常见的可await对象包括协程函数、任务对象、Future、被@coroutine装饰的函数等,这可以使异步代码更简洁。await对象可以暂停当前协程,等待异步操作完成后再继续执行。
import asyncio
async def coro_demo():
print("await coroutine demo")
async def task_demo():
print("await task demo")
async def coro():
print("in coro task")
# 创建 Task 对象
task = asyncio.create_task(coro())
await task
async def future_demo():
print("await future demo")
future = asyncio.Future()
await future
# 这个装饰器已经过时
@asyncio.coroutine
def coro_decorated_demo():
print("await decorated function demo")
async def main():
await coro_demo()
await task_demo()
await future_demo()
await coro_decorated_demo()
if __name__ == '__main__':
asyncio.run(main())
>>> out
DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
def coro_decorated_demo():
await coroutine demo
await task demo
in coro task
await future demo
这个 @asyncio.coroutine 协程装饰器已经过时了,都是使用 async、await 语法替代。
下面是实现 await 方法的demo
import asyncio
class AsyncDownloader:
def __init__(self, url):
self.url = url
self.download_ret = None
def __await__(self):
print(f'Starting download of {self.url}')
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, self.download)
yield from future.__await__()
return self
def download(self):
print(f'Downloading {self.url}...')
# 模拟下载过程
import time
time.sleep(2)
self.download_ret = f'{self.url} downloaded ok'
async def main():
print('Creating downloader...')
downloader = AsyncDownloader('https://www.ithui.top/file.zip')
print('Waiting for download...')
downloader_obj = await downloader
print(f'Download result: {downloader_obj.download_ret}')
if __name__ == '__main__':
asyncio.run(main())
>>> out
Creating downloader...
Waiting for download...
Starting download of https://www.ithui.top/file.zip
Downloading https://www.ithui.top/file.zip...
Download result: https://www.ithui.top/file.zip downloaded ok
用 yield from 来迭代 future对象(符合__await__逻辑),并在结束时return self
异、同步函数需支持后台任务
异步后台任务的好处与场景
减少主程序的等待时间
异步函数可以通过后台任务的方式执行一些耗时操作,如IO操作、网络请求等,而主程序无需等待这些操作完成,可以继续执行其他任务,从而减少程序总体的等待时间。
提高程序响应性能
后台任务的异步执行,可以避免主程序被长时间阻塞,从而改善程序的整体响应性能。用户无需长时间等待才能得到响应。
解决IO密集型任务阻塞问题
对于网络、文件IO等密集型任务,使用同步执行可能会导致长时间阻塞,而异步后台任务可以很好地解决这个问题,避免资源浪费。
良好的用户体验
后台任务的异步处理,给用户的感觉是多个任务同时在执行,实际上CPU在切换处理,这相比线性等待任务完成,可以提供更好的用户体验。
适用于不需要实时结果的任务
邮件发送、数据批处理、文件处理等不需要用户即时等待结果的任务非常适合通过异步方式在后台完成。
在python中同异步函数实现后台任务
-
异步函数可以通过 asyncio.create_task 方法实现后台任务
-
同步函数可以通过线程、线程池来实现
import asyncio
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
async def async_bg_task():
print('async bg task running')
await asyncio.sleep(3)
print('async bg task completed')
def sync_bg_task():
print('sync bg task running')
time.sleep(3)
print('sync bg task completed')
async def main():
print('Starting main program')
# 异步函数的后台任务
asyncio.create_task(async_bg_task())
# 同步函数的后台任务
# with ThreadPoolExecutor() as executor:
# executor.submit(sync_bg_task)
# Thread(target=sync_bg_task).start()
loop = asyncio.get_running_loop()
loop.run_in_executor(executor=ThreadPoolExecutor(), func=sync_bg_task)
print('Main program continues')
await asyncio.sleep(5)
if __name__ == '__main__':
asyncio.run(main())
>>> ThreadPoolExecutor out
Starting main program
sync bg task running
sync bg task completed
Main program continues
async bg task running
async bg task completed
>>> Thread out
Starting main program
sync bg task running
Main program continues
async bg task running
sync bg task completed
async bg task completed
>>> run_in_executor out
Starting main program
sync bg task running
Main program continues
async bg task running
async bg task completed
sync bg task completed
看输出结果可以发现在同步函数使用直接使用线程池 ThreadPoolExecutor 执行还是堵塞了主线程,然后 Thread 没有,通过 loop.run_in_executor 也不会阻塞。后面发现 是 with 语法导致的堵塞,with 的根本原因就是它会等待线程池内的所有线程任务完成并回收,所以主线程必须等同步函数结束后才能继续。一开始我还一以为是线程池使用了主线程的线程后面打印线程名称看了下不是然后调试下就发现了with的问题。
import asyncio
import time
import threading
from concurrent.futures import ThreadPoolExecutor
async def async_bg_task():
print(f"async_bg_task In thread: {threading.current_thread().name}")
print('async bg task running')
await asyncio.sleep(3)
print('async bg task completed')
def sync_bg_task(num):
print(f"sync_bg_task{num} In thread: {threading.current_thread().name}")
print(f'sync bg task{num} running')
time.sleep(3)
print(f'sync bg task{num} completed')
async def main():
print('Starting main program')
# 异步函数的后台任务
asyncio.create_task(async_bg_task())
# 同步函数的后台任务
thread_pool = ThreadPoolExecutor()
# with thread_pool as pool:
# for i in range(5):
# pool.submit(sync_bg_task, i)
for i in range(5):
thread_pool.submit(sync_bg_task, i)
threading.Thread(target=sync_bg_task, args=["thread"]).start()
loop = asyncio.get_running_loop()
loop.run_in_executor(ThreadPoolExecutor(), sync_bg_task, "loop.run_in_executor")
print('Main program continues')
print(f"Main program In thread: {threading.current_thread().name}")
await asyncio.sleep(5)
if __name__ == '__main__':
asyncio.run(main())
三、具体封装实现
import asyncio
from concurrent.futures import ThreadPoolExecutor, Executor
def run_on_executor(executor: Executor = None, background: bool = False):
"""
异步装饰器
- 支持同步函数使用 executor 加速
- 异步函数和同步函数都可以使用 `await` 语法等待返回结果
- 异步函数和同步函数都支持后台任务,无需等待
Args:
executor: 函数执行器, 装饰同步函数的时候使用
background: 是否后台执行,默认False
Returns:
"""
def _run_on_executor(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if background:
return asyncio.create_task(func(*args, **kwargs))
else:
return await func(*args, **kwargs)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
task_func = functools.partial(func, *args, **kwargs) # 支持关键字参数
return loop.run_in_executor(executor, task_func)
# 异步函数判断
wrapper_func = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return wrapper_func
return _run_on_executor
封装成了带参数的装饰器
-
executor: 函数执行器, 装饰同步函数的时候使用
- 可以传递指定的线程池,默认None 根据系统cpu核心数动态创建线程的数量
-
background: 用于标识是否后台执行,默认False
- 有点诟病同步函数的后台任务没有用到这个参数而是使用 await 语法控制,但在使用装饰器时候可以起到后台任务标识作用,也可以的,别人一看有这个参数就知道是后台任务就不用细看函数业务逻辑
- 后续再看看怎么优化,大家有没有比较好建议
-
loop.run_in_executor(executor, task_func) 方法不支持关键字参数的传递,故而采用 task_func = functools.partial(func, *args, **kwargs) ,来构造一个不带参数的函数就可以方便使用了
测试demo
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from py_tools.decorators.base import run_on_executor
from loguru import logger
thread_executor = ThreadPoolExecutor(max_workers=3)
@run_on_executor(background=True)
async def async_func_bg_task():
logger.debug("async_func_bg_task start")
await asyncio.sleep(1)
logger.debug("async_func_bg_task running")
await asyncio.sleep(1)
logger.debug("async_func_bg_task end")
return "async_func_bg_task ret end"
@run_on_executor()
async def async_func():
logger.debug("async_func start")
await asyncio.sleep(1)
logger.debug("async_func running")
await asyncio.sleep(1)
return "async_func ret end"
@run_on_executor(background=True, executor=thread_executor)
def sync_func_bg_task():
logger.debug("sync_func_bg_task start")
time.sleep(1)
logger.debug("sync_func_bg_task running")
time.sleep(1)
logger.debug("sync_func_bg_task end")
return "sync_func_bg_task end"
@run_on_executor()
def sync_func():
logger.debug("sync_func start")
time.sleep(1)
logger.debug("sync_func running")
time.sleep(1)
return "sync_func ret end"
async def main():
ret = await async_func()
logger.debug(ret)
async_bg_task = await async_func_bg_task()
logger.debug(f"async bg task {async_bg_task}")
logger.debug("async_func_bg_task 等待后台执行中")
loop = asyncio.get_event_loop()
for i in range(3):
loop.create_task(async_func())
ret = await sync_func()
logger.debug(ret)
sync_bg_task = sync_func_bg_task()
logger.debug(f"sync bg task {sync_bg_task}")
logger.debug("sync_func_bg_task 等待后台执行")
await asyncio.sleep(10)
if __name__ == '__main__':
asyncio.run(main())
测试详细输出
async_func start
async_func running
async_func ret end
async bg task