fasthttp是如何做到比net/http快十倍的

2024年 3月 8日 64.1k 0

哇,性能太强了吧,话不多说,本期小许和大家一起看看fasthttp Server端的底层实现,来看看到底是如何做到性能如此之快的,有哪些优秀的特性值得我们学习和借鉴的!

Server端处理流程对比

在进行了解fasthttp底层代码实现之前,我们先对两者处理请求的方式进行一个回顾和对比,了解完两者的基本的情况之后,再对fasthttp的实现最进一步分析。

net/http处理流程

在小许文章《图文讲透Golang标准库 net/http实现原理 -- 服务端》中讲的比较详细了,这里再把大致流程整理以下,整体流程如下:

图片图片

  • 1. 将路由和对应的handler注册到一个 map 中,用做后续键值路由匹配
  • 2. 注册完之后就是开启循环监听连接,每获取到一个连接就会创建一个 Goroutine进行处理
  • 3. 在创建好的 Goroutine 里面会循环的等待接收请求数据,然后根据请求的地址去键值路由map中匹配对应的handler
  • 4. 执行匹配到的处理器handler
  • net/http 的实现是一个连接新建一个 goroutine,如果在连接数非常多的时候,,每个连接都会创建一个 Goroutine 就会给系统带来一定的压力。这也就造成了 net/http在处理高并发时的瓶颈。

    每次来了一个连接,都要实例化一个连接对象,这谁受得了,哈哈

    fasthttp处理流程

    再看看fasthttp处理请求的流程:

    图片图片

  • 1. 启动监听
  • 2. 循环监听端口获取连接,建立workerPool
  • 3. 循环尝试获取连接 net.Conn,先会去 ready 队列里获取 workerChan,获取不到就会去对象池获取
  • 4. 将获取到的的连接net.Conn 发送到 workerChan 的 channel 中
  • 5. 开启一个 Goroutine 一直循环获取 workerChan 这个 channel 中的数据
  • 6. 获取到channel中的net.Conn之后就会对请求进行处理
  • workerChan 其实就是一个连接处理对象,这个对象里面有一个 channel 用来传递连接;每个 workerChan 在后台都会有一个 Goroutine 循环获取 channel 中的连接,然后进行处理。

    fasthttp为什么快

    fasthttp的优化主要有以下几个点:

    • • 连接复用,如slice中有可复用的workerChan就从ready这个slice中获取,没有可复用的就在workerChanPool创建一个,万一池子满了(默认是 256 * 1024个)就报错。
    • • 对于内存复用,就是大量使用了sync.Pool(你知道的,sync.Pool复用对象有啥好处),有人统计过,用了整整30个sync.Pool,context、request对象、header、response对象都用了sync.Pool ....
    • • 利用unsafe.Pointer指针进行[]byte 和 string 转换,避免[]byte到string转换时带来的内存分配和拷贝带来的消耗 。

    知道了fasthttp为什么快,接下来我们看下它是如何处理监听处理请求的,在哪些地方用到了这些特性。

    底层实现

    简单案例

    import (
        "github.com/buaazp/fasthttprouter"
        "github.com/valyala/fasthttp"
        "log"
    )
    
    func main() {
        //创建路由
        r := fasthttprouter.New()
        r.GET("/", Index)
        if err := fasthttp.ListenAndServe(":8083", r.Handler); err != nil {
            log.Fatalf("ListenAndServe fatal: %s", err)
        }
    
    }
    func Index(ctx *fasthttp.RequestCtx) {
        ctx.WriteString("hello xiaou code!")
    }

    这个案例同样是几样代码就启动了一个服务。

    创建路由、为不同的路由执行关联不同的处理函数handler,接着跟net/http一样调用 ListenAndServe 函数进行启动服务监听,等待请求进行处理。

    workerPool结构

    workerpool 对象表示 连接处理 工作池,这样可以控制连接建立后的处理方式,而不是像标准库 net/http 一样,对每个请求连接都启动一个 goroutine 处理, 内部的 ready 字段存储空闲的 workerChan 对象,workerChanPool 字段表示管理 workerChan 的对象池。

    workerPool结构体如下:

    type workerPool struct {
        //匹配请求对应的handler
        WorkerFunc ServeHandler
        //最大同时处理的请求数
        MaxWorkersCount int
        
        LogAllErrors bool
        //最大空闲工作时间
        MaxIdleWorkerDuration time.Duration
    
        Logger Logger
        //互斥锁
        lock         sync.Mutex
        //work数量
        workersCount int
        mustStop     bool
        // 空闲的 workerChan
        ready []*workerChan
        //是否关闭workerPool
        stopCh chan struct{}
        //sync.Pool  workerChan 的对象池
        workerChanPool sync.Pool
    
        connState func(net.Conn, ConnState)
    }

    WorkerFunc :这个属性挺重要的,因为给它赋值的是Server.serveConn

    ready:存储了空闲的workerChan

    workerChanPool:是workerChan 的对象池,在sync.Pool中存取临时对象,可减少内存分配

    启动服务

    ListenAndServe是启动服务监听的入口,内部的调用过程如下:

    图片图片

    Server.Serve

    Serve方法为来自给监听到的连接提供处理服务,直到超过了最大限制(256 * 1024)才会报错。

    func (s *Server) Serve(ln net.Listener) error {
        //最大连接处理数
        maxWorkersCount := s.getConcurrency()
    
        s.mu.Lock()
        s.ln = append(s.ln, ln)
        if s.done == nil {
            s.done = make(chan struct{})
        }
        if s.concurrencyCh == nil {
            s.concurrencyCh = make(chan struct{}, maxWorkersCount)
        }
        s.mu.Unlock()
        //workerPool进行初始化
        wp := &workerPool{
            WorkerFunc:            s.serveConn,
            MaxWorkersCount:       maxWorkersCount,
            LogAllErrors:          s.LogAllErrors,
            MaxIdleWorkerDuration: s.MaxIdleWorkerDuration,
            Logger:                s.logger(),
            connState:             s.setState,
        }
        //开启协程,处理协程池的清理工作
        wp.Start()
        atomic.AddInt32(&s.open, 1)
        defer atomic.AddInt32(&s.open, -1)
    
        for {
            // 阻塞等待,获取连接net.Conn
            if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
                ...
                return err
            }
            s.setState(c, StateNew)
            atomic.AddInt32(&s.open, 1)
            //处理获取到的连接net.Conn
            if !wp.Serve(c) {
                //未能处理,说明已达到最大worker限制
                ...
            }
            c = nil
        }
    }

    从上面的注释中我们可以看出 Server 方法主要做了以下几件事:

  • 1. 初始化 worker Pool,并启动
  • 2. net.Listener循环接收请求
  • 3. 将接收到的请求交给workerChan 处理
  • 注意:这里如果超过了设定的最大连接数(默认是 256 * 1024个)就直接报错了

    Start开启协程池

    workerPool进行初始化之后接着就调用Start开启,这里主要是指定sync.Pool变量workerChanPool的创建函数。

    接着开启一个协程,该Goroutine的目的是进行定时清理 workerPool 中的 ready 中保存的空闲 workerChan,清理频率为每 10s 启动一次。

    🚩清理规则是使用二进制搜索算法找出最近可以清理的工作者的索引

    func (wp *workerPool) Start() {
        //wp的关闭channel是否为空
        if wp.stopCh != nil {
            return
        }
        wp.stopCh = make(chan struct{})
        stopCh := wp.stopCh
        //指定workerChanPool的创建函数
        wp.workerChanPool.New = func() interface{} {
            return &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        //开启协程
        go func() {
            var scratch []*workerChan
            for {
                //清理空闲超时的 workerChan
                wp.clean(&scratch)
                select {
                case  0 {
                pic := wrapPerIPConn(s, c)
                if pic == nil {
                    ...
                    continue
                }
                c = pic
            }
            return c, nil
        }
    }

    获取 workerChan

    func (wp *workerPool) Serve(c net.Conn) bool {
    //获取 workerChan
    ch := wp.getCh()
    if ch == nil {
    return false
    }
    //将连接放到channel中
    ch.ch

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论