同步、异步无障碍:Python异步装饰器指南

2023年 8月 16日 94.7k 0

一、引言

Python异步开发已经非常流行了,一些主流的组件像MySQL、Redis、RabbitMQ等都提供了异步的客户端,再处理耗时的时候不会堵塞住主线程,不但可以提高并发能力,也能减少多线程带来的内存资源消耗。但在业务开发的时候一些第三方库没有异步的处理方式,例如OSS、CV、其他第三方提供的SDK以及自己封装的函数有耗时等,此时还是需要借助线程来加速,再异步中就不会堵塞主线程,因此封装一个异步装饰器可以更好的处理异步,让代码更简洁。

常用组件异步库

序号 组件名 异步库 说明
1 MySQL aiomysql github.com/aio-libs/ai… 基于asyncio的MySQL驱动,用asyncio实现异步IO。这是最常用的Python异步MySQL驱动。
2 Redis aioredis github.com/aio-libs-ab… aioredis因为性能好、使用广泛,是Python最主流的Redis异步驱动之一。 之前同步库的redis.py 后面再4.2.0rc1+版本也支持aioredis
3 RabbitMQ aio-pika github.com/mosquito/ai… celery github.com/celery/cele… 基于asyncio的RabbitMQ异步客户端,是最常用的Python异步RabbitMQ客户端之一。 celery是分布式任务队列框架,也支持常用消息中间件的异步封装
4 Kafka aio-kafka github.com/aio-libs/ai… kafka常用的异步客户端
5 http客户端 httpx github.com/encode/http… aiohttp github.com/aio-libs/ai… httpx一个现代的异步HTTP客户端,支持asyncio和操作语法与 Requests 库的API一致。 aiohttp也是十分常用的异步客户端,性能也不错
6 文件处理 aiofiles github.com/Tinche/aiof… 基于asyncio的异步文件操作库,提供类文件对象接口

这里就简单列举一些常用的异步库,可以发现好多异步库都在 github 的 aio-libs 中慢慢孵化,质量都挺不错的。大家有什么好用的异步库欢迎评论区推荐。

github.com/aio-libs

二、功能分析

  • 支持同步函数使用线程加速

  • 异、同步函数需支持 await 语法等待返回结果

  • 异、同步函数需支持后台任务,无需等待

  • 同步函数使用线程加速

    同步函数使用线程,这还是挺简单的使用,内置库的 threading.Thread 就可以实现

    import time
    import threading
    
    
    def task1(name):
        print(f"Hello {name}")
        time.sleep(1)
        print(f"Completed {name}")
    
    
    t1 = threading.Thread(target=task1, args=("hui",))
    t2 = threading.Thread(target=task1, args=("wang",))
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    
    >>> out
    Hello hui
    Hello wang
    Completed hui
    Completed wang
    
    • start()方法用于启动线程执行函数。
    • join()方法用于等待线程执行结束。

    但这样直接开线程的方式比较暴力,也不太好管理,因此可以想到线程池,进行线程复用与管理。Python内置的 concurrent.futures 模块提供了线程池和进程池的实现与封装。

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def task2(name):
        print(f"Hello {name}")
        time.sleep(1)
        return f"Completed {name}"
    
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        future1 = executor.submit(task2, "hui")
        future2 = executor.submit(task2, "zack")
    
    print("ret1", future1.result())
    print("ret2", future2.result())
    
    >>> out
    Hello hui
    Hello zack
    ret1 Completed hui
    ret2 Completed zack
    

    异、同步函数需支持 await 语法

    异、同步函数需支持 await 语法等待返回结果,异步函数本身就支持 await语法,这里主要是实现同步函数支持

    await 语法,在python中可以await语法的对象有如下几大类:

  • 协程对象(coroutine):定义了__await__方法的对象,异步框架中的协程函数都是此类型。
  • 任务对象(Task):封装了协程的对象,如asyncio中的Task,trio中的Task。
  • Future对象:表示异步操作结果的对象,如concurrent.futures.Future。
  • 协程装饰器封装的对象:一些装饰器可以将普通函数或对象包装成可await的对象,如@asyncio.coroutine。
  • 综上,凡是实现了__await__魔术方法的对象或者封装了协程/任务的对象,都可以被await,await会自动把对象交给事件循环运行,等待其完成。

    常见的可await对象包括协程函数、任务对象、Future、被@coroutine装饰的函数等,这可以使异步代码更简洁。await对象可以暂停当前协程,等待异步操作完成后再继续执行。

    import asyncio
    
    
    async def coro_demo():
        print("await coroutine demo")
    
    
    async def task_demo():
        print("await task demo")
    
        async def coro():
            print("in coro task")
    
        # 创建 Task 对象
        task = asyncio.create_task(coro())
        await task
    
    
    async def future_demo():
        print("await future demo")
        future = asyncio.Future()
        await future
    
    
    # 这个装饰器已经过时
    @asyncio.coroutine
    def coro_decorated_demo():
        print("await decorated function demo")
    
    
    async def main():
        await coro_demo()
    
        await task_demo()
    
        await future_demo()
    
        await coro_decorated_demo()
    
    
    if __name__ == '__main__':
        asyncio.run(main())
        
        
    >>> out 
    DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
      def coro_decorated_demo():
      
    await coroutine demo
    await task demo
    in coro task
    await future demo
    

    这个 @asyncio.coroutine 协程装饰器已经过时了,都是使用 async、await 语法替代。

    下面是实现 await 方法的demo

    import asyncio
    
    
    class AsyncDownloader:
    
        def __init__(self, url):
            self.url = url
            self.download_ret = None
    
        def __await__(self):
            print(f'Starting download of {self.url}')
            loop = asyncio.get_event_loop()
            future = loop.run_in_executor(None, self.download)
            yield from future.__await__()
            return self
    
        def download(self):
            print(f'Downloading {self.url}...')
            # 模拟下载过程
            import time
            time.sleep(2)
            self.download_ret = f'{self.url} downloaded ok'
    
    
    async def main():
        print('Creating downloader...')
        downloader = AsyncDownloader('https://www.ithui.top/file.zip')
        print('Waiting for download...')
        downloader_obj = await downloader
        print(f'Download result: {downloader_obj.download_ret}')
    
    
    if __name__ == '__main__':
        asyncio.run(main())
        
    
    >>> out
    Creating downloader...
    Waiting for download...
    Starting download of https://www.ithui.top/file.zip
    Downloading https://www.ithui.top/file.zip...
    Download result: https://www.ithui.top/file.zip downloaded ok    
    

    用 yield from 来迭代 future对象(符合__await__逻辑),并在结束时return self

    异、同步函数需支持后台任务

    异步后台任务的好处与场景

  • 减少主程序的等待时间

    异步函数可以通过后台任务的方式执行一些耗时操作,如IO操作、网络请求等,而主程序无需等待这些操作完成,可以继续执行其他任务,从而减少程序总体的等待时间。

  • 提高程序响应性能

    后台任务的异步执行,可以避免主程序被长时间阻塞,从而改善程序的整体响应性能。用户无需长时间等待才能得到响应。

  • 解决IO密集型任务阻塞问题

    对于网络、文件IO等密集型任务,使用同步执行可能会导致长时间阻塞,而异步后台任务可以很好地解决这个问题,避免资源浪费。

  • 良好的用户体验

    后台任务的异步处理,给用户的感觉是多个任务同时在执行,实际上CPU在切换处理,这相比线性等待任务完成,可以提供更好的用户体验。

  • 适用于不需要实时结果的任务

    邮件发送、数据批处理、文件处理等不需要用户即时等待结果的任务非常适合通过异步方式在后台完成。

  • 在python中同异步函数实现后台任务

    • 异步函数可以通过 asyncio.create_task 方法实现后台任务

    • 同步函数可以通过线程、线程池来实现

    import asyncio
    import time
    from threading import Thread
    from concurrent.futures import ThreadPoolExecutor
    
    
    async def async_bg_task():
        print('async bg task running')
        await asyncio.sleep(3)
        print('async bg task completed')
    
    
    def sync_bg_task():
        print('sync bg task running')
        time.sleep(3)
        print('sync bg task completed')
    
    
    async def main():
        print('Starting main program')
    
        # 异步函数的后台任务
        asyncio.create_task(async_bg_task())
    
        # 同步函数的后台任务
        # with ThreadPoolExecutor() as executor:
        #     executor.submit(sync_bg_task)
    
        # Thread(target=sync_bg_task).start()
    
        loop = asyncio.get_running_loop()
        loop.run_in_executor(executor=ThreadPoolExecutor(), func=sync_bg_task)
    
        print('Main program continues')
        await asyncio.sleep(5)
    
    
    if __name__ == '__main__':
        asyncio.run(main())
        
       
    >>> ThreadPoolExecutor out
    Starting main program
    sync bg task running
    sync bg task completed
    Main program continues
    async bg task running
    async bg task completed
    
    >>> Thread out
    Starting main program
    sync bg task running
    Main program continues
    async bg task running
    sync bg task completed
    async bg task completed
    
    >>> run_in_executor out
    Starting main program
    sync bg task running
    Main program continues
    async bg task running
    async bg task completed
    sync bg task completed
    

    看输出结果可以发现在同步函数使用直接使用线程池 ThreadPoolExecutor 执行还是堵塞了主线程,然后 Thread 没有,通过 loop.run_in_executor 也不会阻塞。后面发现 是 with 语法导致的堵塞,with 的根本原因就是它会等待线程池内的所有线程任务完成并回收,所以主线程必须等同步函数结束后才能继续。一开始我还一以为是线程池使用了主线程的线程后面打印线程名称看了下不是然后调试下就发现了with的问题。

    import asyncio
    import time
    import threading
    from concurrent.futures import ThreadPoolExecutor
    
    
    async def async_bg_task():
        print(f"async_bg_task In thread: {threading.current_thread().name}")
    
        print('async bg task running')
        await asyncio.sleep(3)
        print('async bg task completed')
    
    
    def sync_bg_task(num):
        print(f"sync_bg_task{num} In thread: {threading.current_thread().name}")
    
        print(f'sync bg task{num} running')
        time.sleep(3)
        print(f'sync bg task{num} completed')
    
    
    async def main():
        print('Starting main program')
    
        # 异步函数的后台任务
        asyncio.create_task(async_bg_task())
    
        # 同步函数的后台任务
        thread_pool = ThreadPoolExecutor()
        # with thread_pool as pool:
        #     for i in range(5):
        #         pool.submit(sync_bg_task, i)
    
        for i in range(5):
            thread_pool.submit(sync_bg_task, i)
    
        threading.Thread(target=sync_bg_task, args=["thread"]).start()
    
        loop = asyncio.get_running_loop()
        loop.run_in_executor(ThreadPoolExecutor(), sync_bg_task, "loop.run_in_executor")
    
        print('Main program continues')
        print(f"Main program In thread: {threading.current_thread().name}")
        await asyncio.sleep(5)
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    三、具体封装实现

    import asyncio
    from concurrent.futures import ThreadPoolExecutor, Executor
    
    
    def run_on_executor(executor: Executor = None, background: bool = False):
        """
        异步装饰器
        - 支持同步函数使用 executor 加速
        - 异步函数和同步函数都可以使用 `await` 语法等待返回结果
        - 异步函数和同步函数都支持后台任务,无需等待
        Args:
            executor: 函数执行器, 装饰同步函数的时候使用
            background: 是否后台执行,默认False
    
        Returns:
        """
    
        def _run_on_executor(func):
            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                if background:
                    return asyncio.create_task(func(*args, **kwargs))
                else:
                    return await func(*args, **kwargs)
    
            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                loop = asyncio.get_event_loop()
                task_func = functools.partial(func, *args, **kwargs)    # 支持关键字参数
                return loop.run_in_executor(executor, task_func)
    
            # 异步函数判断
            wrapper_func = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
            return wrapper_func
    
        return _run_on_executor
    

    封装成了带参数的装饰器

    • executor: 函数执行器, 装饰同步函数的时候使用

      • 可以传递指定的线程池,默认None 根据系统cpu核心数动态创建线程的数量
    • background: 用于标识是否后台执行,默认False

      • 有点诟病同步函数的后台任务没有用到这个参数而是使用 await 语法控制,但在使用装饰器时候可以起到后台任务标识作用,也可以的,别人一看有这个参数就知道是后台任务就不用细看函数业务逻辑
      • 后续再看看怎么优化,大家有没有比较好建议
    • loop.run_in_executor(executor, task_func) 方法不支持关键字参数的传递,故而采用 task_func = functools.partial(func, *args, **kwargs) ,来构造一个不带参数的函数就可以方便使用了

    测试demo

    import asyncio
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    from py_tools.decorators.base import run_on_executor
    from loguru import logger
    
    thread_executor = ThreadPoolExecutor(max_workers=3)
    
    
    @run_on_executor(background=True)
    async def async_func_bg_task():
        logger.debug("async_func_bg_task start")
        await asyncio.sleep(1)
        logger.debug("async_func_bg_task running")
        await asyncio.sleep(1)
        logger.debug("async_func_bg_task end")
        return "async_func_bg_task ret end"
    
    
    @run_on_executor()
    async def async_func():
        logger.debug("async_func start")
        await asyncio.sleep(1)
        logger.debug("async_func running")
        await asyncio.sleep(1)
        return "async_func ret end"
    
    
    @run_on_executor(background=True, executor=thread_executor)
    def sync_func_bg_task():
        logger.debug("sync_func_bg_task start")
        time.sleep(1)
        logger.debug("sync_func_bg_task running")
        time.sleep(1)
        logger.debug("sync_func_bg_task end")
        return "sync_func_bg_task end"
    
    
    @run_on_executor()
    def sync_func():
        logger.debug("sync_func start")
        time.sleep(1)
        logger.debug("sync_func running")
        time.sleep(1)
        return "sync_func ret end"
    
    
    async def main():
        ret = await async_func()
        logger.debug(ret)
    
        async_bg_task = await async_func_bg_task()
        logger.debug(f"async bg task {async_bg_task}")
        logger.debug("async_func_bg_task 等待后台执行中")
    
        loop = asyncio.get_event_loop()
        for i in range(3):
            loop.create_task(async_func())
    
        ret = await sync_func()
        logger.debug(ret)
    
        sync_bg_task = sync_func_bg_task()
        logger.debug(f"sync bg task {sync_bg_task}")
        logger.debug("sync_func_bg_task 等待后台执行")
    
        await asyncio.sleep(10)
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    测试详细输出

    async_func start
    async_func running
    async_func ret end

    async bg task

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论