Python学习(6) 爬虫 爬虫系统架构设计

2023年 10月 8日 36.4k 0

主要业务流程

  • 初始请求
  • 请求过滤器
  • 请求队列
  • 响应下载器
  • 数据解析器
  • 数据清洗器
  • 存储器
  • 设计图

    rowin90.github.io/images/py/爬…

    • master + slave:master控制队列,过滤,传递任务;slave负责执行

      • 缺点:master和slave端交互数据频繁,slave的数据进出,都给master去调度,对master端相当于成倍数据,并发比较大
    • 升级策略2:分离响应异步下载与异步处理,避免一方阻塞另一方

      • 如果解析,和清洗也会处理很长时间,并发量就会下降,也可以在中间加入队列,解耦任务
        • 如果没有耗时操作,也没必要新加一个队列来做
        • 具体在哪些环节之间加入队列,取决你分析业务需求,在哪些环节会出现耗时操作
      • 对于一个任务来说,是共享一个进程的,这个队列可以直接用Queue(内存队列),共享一个进程中数据,
    • 升级策略3:日志监控捕获错误,并实时通报。ELK

      • 先对日志进行埋点,针对Error错误日志进行报告
    • 还有一种,master 只负责过滤重复请求;slave自己负责维护自己的队列,只需要 slave 执行任务前询问 master是否有重复值即可

      • 减轻了master的负担,但是slave自己维护自己队列,彼此独立

    系统架构组件

  • 队列组件

    • 队列类型
      • FIFO
    • 内存队列 - 一般实现单机版的队列
      • Python内置队列
      • Asyncio中的队列
    • 持久化队列:分布式,断点续爬
      • Redis队列
      • 消息队列:Kafka,Rabbitmq
  • 过滤器组件

    • 指纹过滤器(redis等): 千万级数据去重
    • simhash过滤器,相似文本去重
    • 布隆过滤器(redis),亿级数据去重(存在极小概率误判),,占的空间比较小,性能高
  • 下载器组件

    • urllib/requests
    • aiohttp
    • tomada.httpclient
  • 异步组件

    • asyncio
    • celery + eventlet/gevent
    • selenium + chrome-headless Pool(多个浏览器实例)
    • appium + android-app Pool (多台设备)
  • 数据解析提取组件

    • 语法规则
      • 正则
      • Xpath
    • 解析提取工具
      • re
      • lxml
      • lxml + bs4
      • lxml + pyquery
  • 数据清洗组件

    • 自定义清洗规则
  • 数据存储组件

    • 存储介质
      • file:csv/json
      • DB:mysql/mongondb
    • 存储工具
      • csv、json
      • sqlalchemy/mongoengine
  • 程序监控组件

    • ELK
      • elasticsearch:日志数据存储
      • logstash: 日志收集工具
      • kibana: 日志可视化
  • 可视化控制组件

    • web界面
    • GUI界面
  • 异步改造并发代码

    同步请求

  • 下载器中,开始使用的是 requests 同步发请求,没有异步
    • 下载器(同步请求)
    import requests
    from spiderSystem.response import Response
    
    
    class RequestsDownloader(object):
        """根据request发起请求,构建response对象"""
    
        def fetch(self, request):
            if request.method.upper() == "GET":
                resp = requests.get(request.with_query_url, headers=request.headers)
            elif request.method.upper() == "POST":
                resp = requests.post(request.with_query_url, headers=request.headers, body=request.body)
            else:
                raise Exception('only support GET or POST Method')
    
            return Response(request, status_code=resp.status_code, url=resp.url, headers=resp.headers, body=resp.content)
    
    • 请求的 Slave 客户端
    from .request_manager import RequestManager
    from .request_manager.utils.redis_tools import get_redis_queue_cls
    from .downloader import RequestsDownloader
    
    from .request import Request
    
    FIFO_QUEUE = get_redis_queue_cls('fifo')
    
    
    class Slave(object):
        def __init__(self, spiders, project_name, request_manager_config):
            self.filter_queue = FIFO_QUEUE("filter_queue", host="localhost")
            self.request_manager = RequestManager(**request_manager_config) 
            self.downloader = RequestsDownloader()   # 用 requests 同步请求的下载器
            self.spiders = spiders
            self.project_name = project_name
    
        def handle_request(self):
            # 1. 获取一个请求
            request = self.request_manager.get_request(self.project_name)
    
            # 2. 发起请求
            response = self.downloader.fetch(request)  # 每次都同步去请求 !!!
    
            # 3. 获取爬虫对象
            spider = self.spiders[request.name]()
    
            # 4. 处理 response
            for result in spider.parse(response):
                if result is None:
                    raise Exception('不允许返回None')
                elif isinstance(result, Request):
                    self.filter_queue.put(result)
                else:
                    # 意味着是一个数据
                    new_result = spider.data_clean(result)
                    spider.data_save(new_result)
    
        def run(self):
            while True:
                self.handle_request() 
    
    

    异步请求改造

    • 通过 tornado 的异步请求
  • 下载器(异步)
  • from tornado.httpclient import HTTPClient, HTTPRequest, AsyncHTTPClient
    
    from spiderSystem.response import Response
    
    
    # tornado 也有同步请求方式 (可以忽略)
    class TornadoDownloader(object):
    
        def __init__(self):
            self.httpclient = HTTPClient()
    
        def fetch(self, request):
            print("tornado 同步客户端发的请求")
            tornado_request = HTTPRequest(request.with_query_url, method=request.method.upper(), headers=request.headers)
            tornado_response = self.httpclient.fetch(tornado_request)
            return Response(request=request, status_code=tornado_response.code, url=tornado_response.effective_url,
                            body=tornado_response.buffer.read())
    
        """
        同步的请求,不能复用,需要用完后关闭
        """
    
        def __del__(self):
            self.httpclient.close()
    
    
    # tornado 也有异步请求方式
    class AsyncTornadoDownloader(object):
    
        def __init__(self):
            self.async_http_client = AsyncHTTPClient()
    
        async def fetch(self, request): # 开启协程
            print("tornado 异步客户端发的请求")
            tornado_request = HTTPRequest(request.with_query_url, method=request.method.upper(), headers=request.headers)
            tornado_response = await self.async_http_client.fetch(tornado_request) # 等待
            return Response(request=request, status_code=tornado_response.code, url=tornado_response.effective_url,
                            headers=request.headers,
                            body=tornado_response.buffer.read())
    
    
    • Slave 调用方
    import asyncio
    import tornado.ioloop
    
    from .request_manager import RequestManager
    from .request_manager.utils.redis_tools import get_redis_queue_cls
    from .downloader import RequestsDownloader, TornadoDownloader, AsyncTornadoDownloader
    
    from .request import Request
    
    FIFO_QUEUE = get_redis_queue_cls('fifo')
    
    
    class Slave(object):
        def __init__(self, spiders, project_name, request_manager_config):
            self.filter_queue = FIFO_QUEUE("filter_queue", host="localhost")
            self.request_manager = RequestManager(**request_manager_config) 
            self.downloader = AsyncTornadoDownloader()  # 异步下载器
            self.spiders = spiders
            self.project_name = project_name
    
        async def handle_request(self):
            
            # request = self.request_manager.get_request(self.project_name)  阻塞改异步
            io_loop = tornado.ioloop.IOLoop.current()
            # 1. 获取一个请求
            future = io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name)  # 不支持协程的函数,可以自己获取事件循环,去定义执行,让其支持协程
            request = await future
    
    
            # 2. 发起请求
            response = await self.downloader.fetch(request)
    
            # 3. 获取爬虫对象
            spider = self.spiders[request.name]()
    
            # 4. 处理 response
            for result in spider.parse(response):
                if result is None:
                    raise Exception('不允许返回None')
                elif isinstance(result, Request):
                    # self.filter_queue.put(result)  可能阻塞,改异步
                    await io_loop.run_in_executor(None, self.filter_queue.put,result)   
                else:
                    # 意味着是一个数据
                    new_result = spider.data_clean(result)
                    spider.data_save(new_result)
    
        async def run(self):
            while True:
                # 不能写成 await self.handle_request(),否则,也是相当于同步请求了
                await asyncio.wait([
                    self.handle_request(),
                    self.handle_request(),
                ])
    
    
    • 启动方式
    if __name__ == '__main__':
        spiders = {BaiduSpider.name: BaiduSpider}
    
        # 同步请求,用 requests 发请求
        # Slave(spiders, project_name=PROJECT_NAME, request_manager_config=REQUEST_MANAGER_CONFIG).run()
    
        # 要用异步方式去请求
        slave = Slave(spiders, project_name=PROJECT_NAME, request_manager_config=REQUEST_MANAGER_CONFIG)
        io_loop = tornado.ioloop.IOLoop.current()
        io_loop.run_sync(slave.run)
    

    tornado库
    io_loop.run_sync 用于将阻塞函数转换为同步函数并在 IOLoop 上执行,它会阻塞当前协程。
    io_loop.run_in_executor 用于在指定的线程池中异步执行耗时的、阻塞的操作,不会阻塞当前协程,并允许 IOLoop 继续处理其他事件。

    asyncio库
    实现类似于 run_sync 的效果:您可以使用 loop.run_until_complete 方法来运行一个协程并等待其完成。这个方法会阻塞当前线程,直到协程执行完毕
    实现类似于 run_in_executor 的效果:您可以使用 loop.run_in_executor 方法将耗时的、阻塞的操作转移到一个线程池中执行,以避免阻塞事件循环。

    async 异步协程改造重点!!!

  • 下载器中,用到的所有异步的地方,必须是协程 async 定义
  • await 后面跟着的,一定是支持协程的方法,要不是一个 协程对象,future 或者 task 对象,比如 self.async_http_client.fetch ,如果不支持协程,会报错
  • 连带着的,所有调用 async 的方法,也必须是协程函数
  • 对于不支持协程的函数,可以自己获取事件循环,去定义执行,让其支持协程;如果一个函数是一个协程函数后,如果这个协程函数中,有任意可以阻塞的,或耗时操作,都应该改成异步的 await ,不然可能会阻塞整个线程
  • 
    # self.request_manager.get_request 本身不支持异步,或者改造成异步,嵌套要改的太深,可以用 io_loop.run_in_executor 来替代
        io_loop = tornado.ioloop.IOLoop.current()
        # 1. 获取一个请求
        future = io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name) 
        request = await future
    
  • 在最开始调用的地方,比如 run ,启动的方式,必须是 用 asyncio.wait 或用其他方式启动(asyncio.gather 或 asyncio.as_completed)
  • # 开启2个协程,去执行。asyncio.wait 能让其变成一个异步关系
    async def run(self):
        while True:
            # 不能写成 await self.handle_request(),否则,也是相当于同步请求了
            await asyncio.wait([
                self.handle_request(),
                self.handle_request(),
            ])
    

    Master 进程用多线程改造

    • master 的启动方法,这两个可以用两个线程去做,不然以前的写法是同步的执行方式
      def run(self):
            # self.run_start_requests()
            # self.run_filter_queue()
            # 两个线程去做
            threading.Thread(target=self.run_start_requests).start()
            threading.Thread(target=self.run_filter_queue).start()
    

    自己封装的SpiderSystem模块安装成内置环境中

  • 在模块目录添加 setup.py 脚本
  • ├── setup.py
    ├── spiderSystem
    ├── README.md
    
    
  • 执行 pip3 setup.py install 即可
  • 查看包信息 pip3 show spiderSystem
  • from setuptools import setup, find_packages
    
    setup(
        name="spiderSystem",
        version="0.1",
        description="spiderSystem module",
        author='raoju',
        url="url",
        license="license",
        packages=find_packages(exclude=[]), # 当前所有模块都安装
        install_requires=[
            "tornado >= 5.1",
            "pycurl",
        ]
    
    )
    
    

    相关文章

    如何删除WordPress中的所有评论
    检查WordPress服务器磁盘使用情况的7种简便方法(查找大文件和数据)
    如何更改WordPress常量FS_METHOD
    如何为区块编辑器、Elementor等构建WordPress文章模板
    如何彻底地删除WordPress主题及相关内容
    如何使用WordPress搭建一个内网

    发布评论