Go sync.Pool

2023年 9月 2日 34.5k 0

介绍

sync.Pool 是 sync 包提供的一个数据类型,是 go语言中的 临时对象池,它的值是用来存储一组可以独立访问的临时对象,它通过池化减少申请新对象,提升程序的性能。

type Pool struct {

 New func() interface{}

}

// Put save x

func (p *Pool) Put(x interface{}) {}

// Get fetch x from pool

func (p *Pool) Get() interface{} {}
区分一下 Cache 和 Pool:
  • 缓存解决的是一个资源的获取代价(通常是网络延迟或复杂计算)比较昂贵,利用 localCache 或 远程缓存来减少 CPU 的开销
  • Pool 的场景是有相同内存结构的数据,被频繁创建和销毁,更多的是减少内存分配和 GC 的开销。

sync.Pool 使用场景

sync.Pool 的定位不是做类似连接池的东西,仅仅是增加对象重用的几率,减少 gc 的负担。

  • gin 框架在 gin.go:L144/L346 中,通过 sync.Pool 对 Context 做了缓存
  • fmt.Printf 中通过 sync.Pool 缓存了 pp struct(newPrinter 函数中)
  • logrus 框架中缓存了 logger entry 对象
// https://github.com/gin-gonic/gin/blob/master/gin.go
// New init a new gin engine
func New() *Engine {
        debugPrintWARNINGNew()
        engine := &Engine{
            // init some fields
        }
        engine.pool = sync.Pool{}
        engine.pool.New = func() any {
            return engine.allocateContext()
        }
        return engine
}

// ServeHTTP conforms to the http.Handler interface.

func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        c := engine.pool.Get().(*Context)
        c.writermem.reset(w)
        c.Request = req
        c.reset()
        engine.handleHTTPRequest(c)
        engine.pool.Put(c)
}

Pool 设计思路

设计目标:

  • 内存复用,避免相同结构在并发场景被频繁构建和销毁
  • 减小竞争,尽可能减小锁粒度,甚至无锁

介绍官方实现之前,先想想如果我们自己实现,大概的思路吧:

  • 由于大量相同结构,所以可以用 slice 去存储(当然链表也可以),用 data 表示
  • 需要支持并发,那么就需要对 data 加锁,来保证并发安全
  • 当 goroutine 足够多,都来竞争同一把锁,场面必定相当血腥。可以考虑拆分成多把小锁,即把 data 分成多个 shard,每个 shard 有一把锁
  • 每个 shard 上可以进一步采取方案降低锁的竞争,甚至可以采用无锁编程的思想
  • Goroutine 按照一定的规则去不同的 shard 取数据,锁的粒度减小了,竞争也减小了。
  • 按照上面这几步,已经能够实现一个开销相对较小的 Pool 了,但是比较粗糙,还是有一些细节问题没有被解决,比如

    • 那么分配多少个 shard 合适呢?
    • Goroutine 去哪个 shard 取数据呢,按 gouroutine ID 取模?
    • 如果 shard 上请求分布不均,旱的旱死,涝的涝死,如何解决?
    • shard 如何扩缩容?
    • 有没有办法进一步减少竞争?
    • 如何回收?

    Go 官方的实现思路大致和上面一致,不过有非常多巧妙的设计和实现细节,下面就来详细探索。

    sync.Pool overview

    下面是 sync.Pool 结构的概览:

    image.png

    大致描述下:

    • Local 是一个指针,指向一个全局的数组,也就是数组的每一个元素,都是一个 shard 分片,在代码的结构体是 poolLocal
      • poolLocal 指向一个链表,存储了表头和表尾,其中链表是双向链表,tail 代表最老的数据,head 代表最新的数据
      • 链表每个元素是一个由 slice 实现的 ring buffer,长度是 2 的 n 次幂,默认从 8 开始。如果放满了元素,那么从链表的 head 插入一个新的 ring buffer,长度是上一个的两倍。
    • localSize 代表 shard 的个数
    • Victim/VictimSize 结构 和 Local/LocalSize一样,用于淘汰策略

    接下来对几个有意思的点讨论下:

    • GPM Processor 缓存
    • 双端队列
    • Ring buffer
    • nocopy
    • Pad

    Go Processor 缓存

    先看下 golang 的 GPM 调度模型
    image.png
    左边是 go 1.13 之前的调度版本,右边是增加了【Processor】概念的调度版本。

    每个 P 上维护了一个 goroutine 的队列,这样在 goroutine 调度的时候,先争取粒度较小的 P 的锁,失败了再去争取全局的锁,达到减少竞争的作用。并且 P 的设计思想本身也是作为一层缓存,所以可以利用 P 去承载不同的 shard。

    P 和 G (goroutine) 还有一层关系,那就是同一时间只会有一个 G 被挂在 P 上去执行,这方面语言层面做了保证,也就是说,如果 shard 放在 P 对象上面,那么当 G 被一组 P 和 M 执行到的时候,相当于已经独占了 P 的资源,对于 shard 的读取不用额外加锁了!

    双端队列

    Go 官方大致思路和第一节一样,不过 sync.Pool 处理了不同 shard 分配不均的情况,这就要提到 sync.Pool 里用到的双端队列

    G 会优先从当前关联的 P 上获取数据,当 G 在 当前 P 上的数据用完了,会优先去其他 P 上“偷取”,示意如下:

    这里有个需要强调的点是,每个 shard (双端队列) 同一时间只会有一个生产者和多个消费者,且生产者可以从队列的 head push 和 pop,消费者只能从队列的 tail pop 数据。

    并且这里生产者和消费者的身份有限制,生产者只能是挂在 shard 所在 P 上的 G,消费者只能是其他 P 上的 G。

    这样设计的原因也比较好理解,大多数情况下每个 shard 上的数据是够用的,不用去“借用”其他 shard 的数据,毕竟距离远,开销大,并且可能出现竞争

    Ring buffer

    到这里你可能会问:

    • 为什么每个 shard 上既用了链表,又用了数组,解决了什么问题?

    pool 要解决的问题是,避免内存的频繁申请和回收,如果只使用一个链表,那么链表的新增和删除也是需要内存申请和回收的。如果只使用数组,那么数组长度应该是多少呢,扩容的时候会导致大量内存被拷贝,老数组的内存被回收。所以采用了综合的方式,减少了动态内存的分配和回收,又保证了可扩容。

    • 为什么链表元素是一个 ring buffer,本质上也是个 slice,和普通 slice 的区别?

    为了减少竞争,前面说了为了减小竞争,把大锁拆成了几个小锁,但小锁的竞争也可能很大,官方在这里采用了无锁编程的思想。

    无锁编程

    首先要声明的是,无锁,不代表没有竞争。有锁,指的是有互斥,golang 里通常是 mutex。
    最常用的无锁的方式是,采用 CAS 原子指令,去保证并行执行的原子性。伪代码如下:

    for {
        oldValue = AtomLoad(&pointer)
        newValue = oldValue + 1
        ok := CAS(&pointer, oldValue, newValue)
        if ok {
            break
        }
    }
    

    这不就是乐观锁吗!没错,它的粒度很小,不会主动交出 CPU 执行权,goroutine 在 mutex 阻塞等待的时候,会被调度器置为休眠状态,从而交出 CPU 执行权。

    虽然 原子操作粒度小,但不意味着性能一定强于互斥锁,还是要分场景!更多参考这里

    无锁数据结构

    无锁编程的精髓就是,把复杂的互斥操作,变成对少数关键字段的原子操作 。
    Ring buffer 是一个典型的无锁数据结构,限制了只能对数组的头和尾进行操作,通过对头和尾的两个下标做原子操作,保证了并发安全和性能。

    Nocopy

    sync.Pool 里有这么个字段,在标准库其他包里也比较常见,意义很明确,就是防止结构体被复制。为什么会有这样的需求呢?其实关键在于,go 的传参传值都是值传递,参考如下case:

    type X struct {
        data    []int
        lock    mutex
        ...
    }
    
    func (x X)Do(){
        x.lock.Lock()
        // do sth with data
    }
    
    func Test() {
        a := X{}
        Dosth(a)
    }
    

    image.png
    很明显,a 和 b 两个对象的底层数据是同一份,但是本应该限制 data 操作的锁,变成了两个不同的锁,两个锁不知道对方的存在,导致对 data 的操作不再是并发安全的。

    Golang 里的 nocopy 只是作为一个标识,并不会影响执行,只是用 govet 工具做静态检查的时候可以检查出包含 nocopy 对象的操作是否 ok。

    Pad

    在前面图里 poolLocal 结构中出现的 pad,是一段字节数组,但是没有任何代码用到了这个变量。那么为什么要放一个看起来毫无用处的变量呢?

    这里涉及到伪共享的问题,具体和 CPU 三级缓存有关:
    image.png
    简化示意图:
    image.png
    CPU 读取内存的最小单位是一个缓存行,大小通常是 64 byte(也有 128 的),如果两个变量的内存地址正好在同一个缓存行,且这两个变量分别同时被两个 core 修改,那么各自 cache line 的数据都不是最新的,会导致数据不一致的问题。实际上不同 core 在修改缓存行的时候,会先和其他 core 确认当前缓存行的所有权,core 直接会发送消息去同步 cache line 的状态。

    像这样因为缓存带来的额外的竞争,被称作伪共享。平时的业务代码一般不会去考虑这个问题,但在一些高性能的组件或框架设计的时候,需要考虑这个问题。解决方案比较简单粗暴,就是把可能被大量计算的,且内存紧密排布的变量,用一些额外的内存隔开,保证他们不在同一个 cache line。

    关于伪共享的更多细节,可以参考这里

    小结

    sync.Pool 减少竞争的手段

    • 正常把数据分片,增加了锁的个数,减少了每把锁的竞争压力。sync.Pool 借助了 GMP 调度级别的策略,使得大多数情况下只有 1 个 goroutine访问当前分片,大大减少了竞争
    • 双端队列,当前 goroutine 读写 head 节点,其他 goroutine 读写 tail 节点,天然隔离,不需要锁
    • 使用 ringBuffer,即使当前 goroutine 和 其他 goroutine 竞争同一个节点的数据(在前面几点的作用下,走到这里的概率已经非常小了),在竞争较小时,ringbuffer cas 机制效率远远高于锁

    源码解析

    Pool

    type Pool struct {
     noCopy noCopy
     local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
       
     localSize uintptr        // size of the local array
    
     victim     unsafe.Pointer // local from previous cycle
    
     victimSize uintptr        // size of victims array
    
     // New optionally specifies a function to generate
     // a value when Get would otherwise return nil.
     // It may not be changed concurrently with calls to Get.
     New func() interface{}
    
    }
    
     // Local per-P Pool appendix.
    type poolLocalInternal struct {
     private interface{} // Can be used only by the respective P.
     shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
    }
    
    type poolLocal struct {
     poolLocalInternal
     // Prevents false sharing on widespread platforms with
     // 128 mod (cache line size) = 0 .
     
     pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
    }
    

    理解上节的逻辑,这里的代码也很好理解。需要补充的细节就是,每个 shard 上都会有一个私有的对象 private,因为同一时刻只有一个 G 会占有当前 P,所以这个 private 对象是不需要竞争的,获取的成本非常小。private 为空,再去 share 的 ring buffer 里取获取数据。

    Get

    根据 overview 和 双端队列两个图,其实已经能大致推出 pool.Get 的流程逻辑了:

  • 根据当前 G 获取当前 P,获取到 P 上的 shard(poolLocal)
  • 若当前 shard 上有私有的数据(private),则获取并返回,若如,从 shard 链表的 head 上获取
  • 找到 head,用 CAS 的方式无锁地从 ring buffer 获取数据
  • Head 上获取不到数据,则依次从链表的前一个节点取
  • 若当前 P 上的链表上都没有数据,那么从其他 P 上偷取,遍历其他所有 shard,不一样的是,从链表的 tail 到 head 的方向遍历
  • 若其他 shard 上也没有,那么用 new 方法新生成一个对象返回
  • 看看代码的更多细节吧:

    func (p *Pool) Get() interface{} {
       // 这段可以忽略
       if race.Enabled {
          race.Disable()
       }
    
       // 获取当前 P 的 ID 和 shard,并且会把当前 G 和 P 做一个强绑定,避免 P 被其他 G 抢占调度
       l, pid := p.pin() 
       x := l.private // 先检查 private 有没有,如果有,返回 private 指向的对象,并把 private 指针清空
       l.private = nil
       if x == nil {
          // Try to pop the head of the local shard. We prefer
          // the head over the tail for temporal locality of
          // reuse.
          // 如果 private 获取不到,就去当前 shard 的链表上依次取数据
          x, _ = l.shared.popHead() 
          if x == nil {
             // 当前 P 的 shard 没有数据,那么去其他 P 上‘偷取’
             x = p.getSlow(pid) 
          }
       }
    
       runtime_procUnpin() // 取消 G 和 P 的强绑定,其他 G 可以抢占当前 P 了
    
       if race.Enabled {
          race.Enable()
          if x != nil {
             race.Acquire(poolRaceAddr(x))
          }
       }
    
       if x == nil && p.New != nil {
          x = p.New() // 所有 P 上都没有数据,新生成一个
       }
       return x
    }
    
    func (c *poolChain) popHead() (interface{}, bool) {
       // 遍历链表,从每个链表元素里的 ring buffer 取数据
       d := c.head
       for d != nil {
          if val, ok := d.popHead(); ok {
             return val, ok
          }
          // 获取链表下一个元素
          d = loadPoolChainElt(&d.prev)
       }
       return nil, false
    }
    

    Ring buffer

    type poolDequeue struct {

    // head 和 tail 放到同一个字段里,各占 32 bit,操作的时候通过原子操作读写

    headTail uint64

    // 每个 ring buffer 一旦生成,长度固定,不会扩容,是 sync.Pool 的底层存储

    vals []eface

    }

    func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {

    const mask = 1 dequeueBits) & mask)

    tail = uint32(ptrs & mask)

    return

    }

    func (d *poolDequeue) pack(head, tail uint32) uint64 {

    const mask = 1

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论