19编写类postman后端功能

2023年 7月 26日 92.4k 0

写在前面

本次章节,由于要完成postman的主要发送接口功能,因此篇幅预计在一万字上下。且有部分代码不进行注释,只在后面针对核心方法进行讲解,对水平要求偏高,望提前知晓!

回顾

接上篇,上次我们在前端项目中,补全了前端的自定义导航栏。要知道,咱们这是一个接口测试平台。一个接口自动化平台,最核心的当然是对api的请求操作,所以咱们刻不容缓,加快进度,趁热打铁,来点干货吧。用aiohttp来协助我们完成接口自动化请求。

技术选型
  • 异步支持:aiohttp是一个异步的HTTP客户端库,而requests是同步的。在异步编程中,使用异步库可以充分利用单线程处理多个并发请求,提高程序的性能和吞吐量。在高并发场景下,aiohttp通常能够更好地处理大量的并发请求。
  • 非阻塞IO:aiohttp使用非阻塞的IO模型,在发起请求时不会阻塞主线程,可以在等待响应的同时继续处理其他任务。这在IO密集型的场景下特别有优势,比如在网络请求过程中可以同时进行其他计算任务,提高了程序的运行效率。
  • Python异步生态系统:aiohttp是Python异步生态系统的一部分,与其他异步库(如asyncio)很好地集成在一起,可以方便地编写异步代码。而requests是同步的,如果在异步环境中使用,可能需要借助额外的库来实现异步功能,增加了复杂性。
  • 对WebSocket的支持:aiohttp还提供了对WebSocket的原生支持,可以方便地处理WebSocket连接,而requests并不直接支持WebSocket。
  • Python 3.5+兼容性:aiohttp支持Python 3.5及以上版本,而requests虽然也可以在Python 3.5+上运行,但更早的Python版本可能需要使用较老的requests版本,导致不同Python版本的代码兼容性较差。
  • 需要注意的是,选择使用aiohttp还要考虑项目的实际需求和技术栈。如果项目对异步处理要求较高,有大量的并发请求或需要与异步IO库集成,那么aiohttp可能是更好的选择。但如果项目规模较小,对并发要求不高,且更喜欢简单易用的同步请求库,那么requests也是一个很好的选择。

    综上所述,在此技术选型选择aiohttp。

    设计思路

    由于后期我们不只是需要支持http请求,还有一系列的rpc、ws等请求,因此我准备将核心方法新增一个目录进行管理,名字就叫requestpages。

    编码

    新增文件abandon-server/src/app/requestpages/AsyncHttpClient.py

    import json
    import time
    from urllib.parse import urlencode
    
    import aiohttp
    from aiohttp import FormData
    
    
    # 定义一个异步请求类AsyncRequest
    class AsyncRequest(object):
    
        def __init__(self, url: str, timeout=15, **kwargs):
            self.url = url
            self.kwargs = kwargs
            self.timeout = aiohttp.ClientTimeout(total=timeout)
    
        def get_cookie(self, session):
            """
            获取Cookie的方法,接收一个session对象作为参数
            :param session:
            :return:
            """
            cookies = session.cookie_jar.filter_cookies(self.url)
            return {k: v.value for k, v in cookies.items()}
    
        def get_data(self, kwargs):
            """
            获取请求的数据的方法,接收一个kwargs字典作为参数
            :param kwargs:
            :return:
            """
            if kwargs.get("json") is not None:
                return kwargs.get("json")
            return kwargs.get("data")
    
        async def invoke(self, method: str):
            """
            发送异步请求的方法,接收一个method字符串作为参数
            :param method: str
            :return:
            """
            start = time.time()
            async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
                async with session.request(method, self.url, timeout=self.timeout, **self.kwargs) as resp:
                    if resp.status != 200:
                        # 修复bug,当http状态码不为200的时候给出提示
                        return await self.collect(False, self.get_data(self.kwargs), resp.status, msg="http状态码不为200")
                    cost = "%.0fms" % ((time.time() - start) * 1000)
                    response, json_format = await AsyncRequest.get_resp(resp)
                    cookie = self.get_cookie(session)
                    return await self.collect(True, self.get_data(self.kwargs), resp.status, response,
                                              resp.headers, resp.request_info.headers, elapsed=cost,
                                              cookies=cookie, json_format=json_format)
    
        @staticmethod
        async def client(url: str, timeout=15, **kwargs):
            """
            用于创建AsyncRequest对象并返回
            :param url: str
            :param timeout: int
            :param kwargs:
            :return:
            """
            if not url.startswith(("http://", "https://")):
                raise Exception("请输入正确的url, 记得带上http哦")
            headers = kwargs.get("headers")
            body = kwargs.get("body", {})
            if body is None:
                r = AsyncRequest(url, headers=headers, timeout=timeout)
            elif body.get("body_type") == "none":
                r = AsyncRequest(url, headers=headers, timeout=timeout)
            elif body.get("body_type") == "json":
                if "Content-Type" not in headers:
                    headers['Content-Type'] = "application/json; "
                try:
                    body_data = body["body"]
                    body_data = json.loads(json.dumps(body_data))
                except json.JSONDecodeError as e:
                    raise Exception(f"json格式不正确: {e}")
                r = AsyncRequest(url, headers=headers, timeout=timeout,
                                 json=body_data)
            elif body.get("body_type") == "formdata":
                try:
                    body_data = body.get("body", [])
                    form_data = FormData()
                    for item in body_data:
                        # 如果是文本类型,直接添加key-value
                        if item.get("type") == 'TEXT':
                            form_data.add_field(item.get("key"), item.get("value", ''))
                        # todo 后期可能会改写add_file方法,暂时先注释掉基础写法
                        # else:
                        #     # 如果是文件类型,使用add_file方法添加文件
                        #     file_content = await file.read()
                        #     form_data.add_file(item.get("key"), file_content, filename=item.get("value"))
                    r = AsyncRequest(url, headers=headers, data=form_data, timeout=timeout)
                except Exception as e:
                    raise Exception(f"解析form-data失败: {str(e)}")
            elif body["body_type"] == "xform":
                body_data = kwargs.get("body", "{}")
                body_encoded = urlencode(body_data)
                r = AsyncRequest(url, headers=headers, data=body_encoded, timeout=timeout)
            else:
                # 暂时未支持其他类型
                r = AsyncRequest(url, headers=headers, timeout=timeout, data=kwargs.get("body", {})["body"])
            return r
    
        @staticmethod
        async def get_resp(resp):
            """
            用于获取响应的数据,返回响应的文本内容和一个布尔值,表示是否为json格式
            :param resp: str
            :return:
            """
            try:
                data = await resp.json(encoding='utf-8')
                # 说明是json格式
                return json.dumps(data, ensure_ascii=False, indent=4), True
            except:
                data = await resp.text()
                # 说明不是json格式,我们不做loads操作了
                return data, False
    
        @staticmethod
        def get_request_data(body):
            """
            用于获取请求的数据
            :param body: str
            :return:
            """
            request_body = body
            if isinstance(body, bytes):
                request_body = request_body.decode()
            if isinstance(body, FormData):
                request_body = str(body)
            if isinstance(request_body, str) or request_body is None:
                return request_body
            return json.dumps(request_body, ensure_ascii=False, indent=4)
    
        @staticmethod
        async def collect(status, request_data, status_code=200, response=None, response_headers=None,
                          request_headers=None, cookies=None, elapsed=None, msg="Request Successful!", **kwargs):
            """
            用于收集http返回的数据,接收多个参数,并将它们封装成字典形式返回
            :param status: 请求状态
            :param request_data: 请求入参
            :param status_code: 状态码
            :param response: 相应
            :param response_headers: 返回header
            :param request_headers:  请求header
            :param cookies:  cookie
            :param elapsed: 耗时
            :param msg: 报错信息
            :return:
            """
            request_headers = json.dumps({k: v for k, v in request_headers.items()} if request_headers is not None else {},
                                         ensure_ascii=False)
            response_headers = json.dumps(
                {k: v for k, v in response_headers.items()} if response_headers is not None else {},
                ensure_ascii=False)
            cookies = {k: v for k, v in cookies.items()} if cookies is not None else {}
            cookies = json.dumps(cookies, ensure_ascii=False)
            return {
                "status": status, "response_data": response, "status_code": status_code,
                "request_data": AsyncRequest.get_request_data(request_data),
                "response_headers": response_headers, "request_headers": request_headers,
                "msg": msg, "elapsed_time": elapsed, "cookies": cookies, **kwargs,
            }
    

    新增HttpRequestForm的结构体,新增abandon-server/src/app/schema/http.py

    from typing import Optional, Union
    from pydantic import BaseModel, validator
    
    from src.app.exception.error import ParamsError
    
    
    # 定义一个数据模型,用于接收HTTP请求的相关信息
    class HttpRequestForm(BaseModel):
        method: str
        url: str
        # 定义HTTP请求的请求体,可以是字典或列表类型,可选参数,默认为None
        body: Optional[Union[dict, list]] = None
        # 定义HTTP请求的请求头,可以是字典类型,可选参数,默认为一个空字典
        headers: Optional[dict] = {}
    
        # 使用pydantic的validator装饰器,对method和url字段进行验证
        @validator('method', 'url')
        def name_not_empty(cls, v):
            # 验证方法:检查字符串是否为空或仅包含空格
            if isinstance(v, str) and len(v.strip()) == 0:
                # 如果为空,抛出自定义异常ParamsError,提示不能为空
                raise ParamsError("不能为空")
            # 如果验证通过,返回原始值v
            return v
    
    
    核心方法讲解

    本次功能,核心方法大概分为invoke函数和client

    • 首先讲解的是client函数:

    这个函数方法client用于创建AsyncRequest对象并返回。它的作用是根据传入的URL、超时时间和其他参数来创建不同类型的AsyncRequest对象,用于进行异步的HTTP请求。

    下面对这个函数方法的具体实现进行详细解释:

  • async def client(url: str, timeout=15, **kwargs)::这是一个异步函数方法,接收三个参数:url为字符串类型的HTTP请求URL,timeout为超时时间,默认为15秒,**kwargs为可变关键字参数。

  • 首先,通过url.startswith(("http://", "https://"))来判断URL是否以http://https://开头,如果不是,就抛出异常,提示请输入正确的URL。

  • 然后,从**kwargs中获取headersbody的值。headers是HTTP请求的头部信息,body是HTTP请求的请求体,它是一个字典类型,默认为空字典{}

  • 根据body.get("body_type")来判断body中的body_type字段的值,从而决定创建何种类型的AsyncRequest对象:

    a. 如果body_type不存在(即body.get("body_type")返回None),或者body_type的值是"none",则创建一个不带请求体的AsyncRequest对象,只包含URL和请求头。

    b. 如果body_type的值是"json",则首先检查请求头headers中是否包含"Content-Type"字段,如果不包含,则将"Content-Type"设置为"application/json; charset=UTF-8"。然后尝试将body["body"]的值转换为JSON格式,如果转换失败(出现JSONDecodeError异常),则抛出异常提示"json格式不正确"。最终创建一个带有JSON格式的请求体的AsyncRequest对象。

    c. 如果body_type的值是"formdata",则将body["body"]中的数据处理成FormData格式,逐个添加到form_data对象中,形成请求体。其中,如果item的"key"对应的是文本类型,则直接添加key-value到请求体中;如果是文件类型,暂时注释掉"else"部分,后期可能会添加更改的方法。最终创建一个带有FormData格式的请求体的AsyncRequest对象。

    d. 如果body_type的值是"xform",则将kwargs["body"]进行URL编码,得到一个字符串形式的请求体,并创建一个带有x-www-form-urlencoded格式的AsyncRequest对象。

    e. 如果body_type不属于上述几种类型,暂时未支持其他类型,创建一个空的请求体({})的AsyncRequest对象。

  • 最后,根据不同的body_type的值,创建不同类型的AsyncRequest对象,并将其返回。

    • invoke函数

    这个函数方法invoke是用于发送异步请求的方法,它接收一个字符串类型的method参数作为HTTP请求的方法(如GET、POST等),然后进行异步的HTTP请求,并返回请求结果。

    下面对这个函数方法的具体实现进行详细解释:

  • async def invoke(self, method: str)::这是一个异步函数方法,接收一个名为method的字符串参数,表示HTTP请求的方法(GET、POST等)。

  • 首先,记录请求开始时间start = time.time()

  • 创建一个异步的ClientSession对象session,用于发送HTTP请求。cookie_jar=aiohttp.CookieJar(unsafe=True)表示使用不安全的CookieJar,即可以在httphttps之间共享cookie。

  • 使用sessionrequest方法发送HTTP请求,传入methodself.url(请求URL)、timeout(超时时间)和**self.kwargs(其他HTTP请求参数,如请求头和请求体)。

  • 判断响应的状态码resp.status是否为200,如果不为200,则通过self.collect方法收集请求失败的信息,并返回False表示请求不成功,同时附带HTTP状态码不为200的提示信息。

  • 如果响应的状态码为200,则计算请求耗时cost = "%.0fms" % ((time.time() - start) * 1000),将耗时信息格式化成毫秒单位的字符串。

  • 调用AsyncRequest.get_resp(resp)方法,解析响应内容,返回一个元组,第一个元素是响应内容response,第二个元素是一个布尔值json_format,表示响应是否为JSON格式。

  • 使用sessioncookie_jar属性获取请求中的cookie信息,并将其转换成字典格式,保存在cookie变量中。

  • 最后,通过self.collect方法收集请求成功的信息,并将请求结果、状态码、响应内容、响应头、请求头、耗时、cookie信息以及是否为JSON格式的标记等信息一并返回。其中,状态为True表示请求成功。

  • 注册http请求相关路由

    新增路由相关abandon-server/src/app/routes/request/http.py

    from fastapi import APIRouter
    
    from src.app.customized.customized_response import AbandonJSONResponse
    from src.app.requestpages.AsyncHttpClient import AsyncRequest
    from src.app.schema.http import HttpRequestForm
    
    router = APIRouter(prefix="/request")
    
    
    @router.post("/http")
    async def http_request(data: HttpRequestForm):
        try:
            r = await AsyncRequest.client(data.url, headers=data.headers, body=data.body)
            response = await r.invoke(data.method)
            return AbandonJSONResponse.success(response)
        except Exception as e:
            return AbandonJSONResponse.failed(e)
    
    
    @router.post("/posttest")
    async def post_none():
        return AbandonJSONResponse.success('success')
    

    注册路由

    image.png

    验证
  • GET 空接口
  • image.png

  • POST 空接口
  • image.png
    3. POST none接口

    后续只发curl,不截图了

    curl --location 'http://127.0.0.1:9923/request/http' 
    --header 'Content-Type: application/json' 
    --data '{
        "method": "POST",
        
        "url": "http://127.0.0.1:9923/request/posttest",
        "body": {"body_type": "none"}
        
        
        
    }'
    
  • POST json接口
  • curl --location 'http://127.0.0.1:9923/request/http' 
    --header 'Content-Type: application/json' 
    --data '{
        "method": "POST",
        
        "url": "http://127.0.0.1:9923/request/posttest",
        "headers": {"Content-Type": "application/json"},
        "body": {"body_type": "json", "body":{"data":"2"}}
        
        
        
        
    }'
    
  • POST form-data接口
  • curl --location 'http://127.0.0.1:9923/request/http' 
    --header 'Content-Type: application/json' 
    --data '{
        "method": "POST",
        
        "url": "http://127.0.0.1:9923/request/posttest",
        
        "headers": {"Content-Type": "application/json"},
        
        "body": {"body_type": "formdata", "body":[{"key":"s","value":"v","type":"TEXT"},{"key":"s","value":"v","type":"FILE"}]}
        
    }'
    
  • POST x-www-form-urlencoded接口
  • curl --location 'http://127.0.0.1:9923/request/http' 
    --header 'Content-Type: application/json' 
    --data '{
        "method": "POST",
        
        "url": "http://127.0.0.1:9923/request/posttest",
        
        
        "headers": {"Content-Type": "application/json"},
        
        "body": {"body_type": "xform", "body":{"data":"2"}}
    }'
    

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论