介绍
sync.Pool
是 sync 包提供的一个数据类型,是 go语言中的 临时对象池,它的值是用来存储一组可以独立访问的临时对象,它通过池化减少申请新对象,提升程序的性能。
type Pool struct {
New func() interface{}
}
// Put save x
func (p *Pool) Put(x interface{}) {}
// Get fetch x from pool
func (p *Pool) Get() interface{} {}
区分一下 Cache 和 Pool:
- 缓存解决的是一个资源的获取代价(通常是网络延迟或复杂计算)比较昂贵,利用 localCache 或 远程缓存来减少 CPU 的开销
- Pool 的场景是有相同内存结构的数据,被频繁创建和销毁,更多的是减少内存分配和 GC 的开销。
sync.Pool 使用场景
sync.Pool 的定位不是做类似连接池的东西,仅仅是增加对象重用的几率,减少 gc 的负担。
- gin 框架在 gin.go:L144/L346 中,通过 sync.Pool 对 Context 做了缓存
- fmt.Printf 中通过 sync.Pool 缓存了 pp struct(newPrinter 函数中)
- logrus 框架中缓存了 logger entry 对象
// https://github.com/gin-gonic/gin/blob/master/gin.go
// New init a new gin engine
func New() *Engine {
debugPrintWARNINGNew()
engine := &Engine{
// init some fields
}
engine.pool = sync.Pool{}
engine.pool.New = func() any {
return engine.allocateContext()
}
return engine
}
// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
c := engine.pool.Get().(*Context)
c.writermem.reset(w)
c.Request = req
c.reset()
engine.handleHTTPRequest(c)
engine.pool.Put(c)
}
Pool 设计思路
设计目标:
- 内存复用,避免相同结构在并发场景被频繁构建和销毁
- 减小竞争,尽可能减小锁粒度,甚至无锁
介绍官方实现之前,先想想如果我们自己实现,大概的思路吧:
按照上面这几步,已经能够实现一个开销相对较小的 Pool 了,但是比较粗糙,还是有一些细节问题没有被解决,比如
- 那么分配多少个 shard 合适呢?
- Goroutine 去哪个 shard 取数据呢,按 gouroutine ID 取模?
- 如果 shard 上请求分布不均,旱的旱死,涝的涝死,如何解决?
- shard 如何扩缩容?
- 有没有办法进一步减少竞争?
- 如何回收?
Go 官方的实现思路大致和上面一致,不过有非常多巧妙的设计和实现细节,下面就来详细探索。
sync.Pool overview
下面是 sync.Pool 结构的概览:
大致描述下:
- Local 是一个指针,指向一个全局的数组,也就是数组的每一个元素,都是一个 shard 分片,在代码的结构体是 poolLocal
- poolLocal 指向一个链表,存储了表头和表尾,其中链表是双向链表,tail 代表最老的数据,head 代表最新的数据
- 链表每个元素是一个由 slice 实现的 ring buffer,长度是 2 的 n 次幂,默认从 8 开始。如果放满了元素,那么从链表的 head 插入一个新的 ring buffer,长度是上一个的两倍。
- localSize 代表 shard 的个数
- Victim/VictimSize 结构 和 Local/LocalSize一样,用于淘汰策略
接下来对几个有意思的点讨论下:
- GPM Processor 缓存
- 双端队列
- Ring buffer
- nocopy
- Pad
Go Processor 缓存
先看下 golang 的 GPM 调度模型
左边是 go 1.13 之前的调度版本,右边是增加了【Processor】概念的调度版本。
每个 P 上维护了一个 goroutine 的队列,这样在 goroutine 调度的时候,先争取粒度较小的 P 的锁,失败了再去争取全局的锁,达到减少竞争的作用。并且 P 的设计思想本身也是作为一层缓存,所以可以利用 P 去承载不同的 shard。
P 和 G (goroutine) 还有一层关系,那就是同一时间只会有一个 G 被挂在 P 上去执行,这方面语言层面做了保证,也就是说,如果 shard 放在 P 对象上面,那么当 G 被一组 P 和 M 执行到的时候,相当于已经独占了 P 的资源,对于 shard 的读取不用额外加锁了!
双端队列
Go 官方大致思路和第一节一样,不过 sync.Pool 处理了不同 shard 分配不均的情况,这就要提到 sync.Pool 里用到的双端队列
G 会优先从当前关联的 P 上获取数据,当 G 在 当前 P 上的数据用完了,会优先去其他 P 上“偷取”,示意如下:
这里有个需要强调的点是,每个 shard (双端队列) 同一时间只会有一个生产者和多个消费者,且生产者可以从队列的 head push 和 pop,消费者只能从队列的 tail pop 数据。
并且这里生产者和消费者的身份有限制,生产者只能是挂在 shard 所在 P 上的 G,消费者只能是其他 P 上的 G。
这样设计的原因也比较好理解,大多数情况下每个 shard 上的数据是够用的,不用去“借用”其他 shard 的数据,毕竟距离远,开销大,并且可能出现竞争
Ring buffer
到这里你可能会问:
- 为什么每个 shard 上既用了链表,又用了数组,解决了什么问题?
pool 要解决的问题是,避免内存的频繁申请和回收,如果只使用一个链表,那么链表的新增和删除也是需要内存申请和回收的。如果只使用数组,那么数组长度应该是多少呢,扩容的时候会导致大量内存被拷贝,老数组的内存被回收。所以采用了综合的方式,减少了动态内存的分配和回收,又保证了可扩容。
- 为什么链表元素是一个 ring buffer,本质上也是个 slice,和普通 slice 的区别?
为了减少竞争,前面说了为了减小竞争,把大锁拆成了几个小锁,但小锁的竞争也可能很大,官方在这里采用了无锁编程的思想。
无锁编程
首先要声明的是,无锁,不代表没有竞争。有锁,指的是有互斥,golang 里通常是 mutex。
最常用的无锁的方式是,采用 CAS 原子指令,去保证并行执行的原子性。伪代码如下:
for {
oldValue = AtomLoad(&pointer)
newValue = oldValue + 1
ok := CAS(&pointer, oldValue, newValue)
if ok {
break
}
}
这不就是乐观锁吗!没错,它的粒度很小,不会主动交出 CPU 执行权,goroutine 在 mutex 阻塞等待的时候,会被调度器置为休眠状态,从而交出 CPU 执行权。
虽然 原子操作粒度小,但不意味着性能一定强于互斥锁,还是要分场景!更多参考这里
无锁数据结构
无锁编程的精髓就是,把复杂的互斥操作,变成对少数关键字段的原子操作 。
Ring buffer 是一个典型的无锁数据结构,限制了只能对数组的头和尾进行操作,通过对头和尾的两个下标做原子操作,保证了并发安全和性能。
Nocopy
sync.Pool 里有这么个字段,在标准库其他包里也比较常见,意义很明确,就是防止结构体被复制。为什么会有这样的需求呢?其实关键在于,go 的传参传值都是值传递,参考如下case:
type X struct {
data []int
lock mutex
...
}
func (x X)Do(){
x.lock.Lock()
// do sth with data
}
func Test() {
a := X{}
Dosth(a)
}
很明显,a 和 b 两个对象的底层数据是同一份,但是本应该限制 data 操作的锁,变成了两个不同的锁,两个锁不知道对方的存在,导致对 data 的操作不再是并发安全的。
Golang 里的 nocopy 只是作为一个标识,并不会影响执行,只是用 govet 工具做静态检查的时候可以检查出包含 nocopy 对象的操作是否 ok。
Pad
在前面图里 poolLocal 结构中出现的 pad,是一段字节数组,但是没有任何代码用到了这个变量。那么为什么要放一个看起来毫无用处的变量呢?
这里涉及到伪共享的问题,具体和 CPU 三级缓存有关:
简化示意图:
CPU 读取内存的最小单位是一个缓存行,大小通常是 64 byte(也有 128 的),如果两个变量的内存地址正好在同一个缓存行,且这两个变量分别同时被两个 core 修改,那么各自 cache line 的数据都不是最新的,会导致数据不一致的问题。实际上不同 core 在修改缓存行的时候,会先和其他 core 确认当前缓存行的所有权,core 直接会发送消息去同步 cache line 的状态。
像这样因为缓存带来的额外的竞争,被称作伪共享。平时的业务代码一般不会去考虑这个问题,但在一些高性能的组件或框架设计的时候,需要考虑这个问题。解决方案比较简单粗暴,就是把可能被大量计算的,且内存紧密排布的变量,用一些额外的内存隔开,保证他们不在同一个 cache line。
关于伪共享的更多细节,可以参考这里
小结
sync.Pool 减少竞争的手段
- 正常把数据分片,增加了锁的个数,减少了每把锁的竞争压力。sync.Pool 借助了 GMP 调度级别的策略,使得大多数情况下只有 1 个 goroutine访问当前分片,大大减少了竞争
- 双端队列,当前 goroutine 读写 head 节点,其他 goroutine 读写 tail 节点,天然隔离,不需要锁
- 使用 ringBuffer,即使当前 goroutine 和 其他 goroutine 竞争同一个节点的数据(在前面几点的作用下,走到这里的概率已经非常小了),在竞争较小时,ringbuffer cas 机制效率远远高于锁
源码解析
Pool
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
理解上节的逻辑,这里的代码也很好理解。需要补充的细节就是,每个 shard 上都会有一个私有的对象 private,因为同一时刻只有一个 G 会占有当前 P,所以这个 private 对象是不需要竞争的,获取的成本非常小。private 为空,再去 share 的 ring buffer 里取获取数据。
Get
根据 overview 和 双端队列两个图,其实已经能大致推出 pool.Get 的流程逻辑了:
看看代码的更多细节吧:
func (p *Pool) Get() interface{} {
// 这段可以忽略
if race.Enabled {
race.Disable()
}
// 获取当前 P 的 ID 和 shard,并且会把当前 G 和 P 做一个强绑定,避免 P 被其他 G 抢占调度
l, pid := p.pin()
x := l.private // 先检查 private 有没有,如果有,返回 private 指向的对象,并把 private 指针清空
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
// 如果 private 获取不到,就去当前 shard 的链表上依次取数据
x, _ = l.shared.popHead()
if x == nil {
// 当前 P 的 shard 没有数据,那么去其他 P 上‘偷取’
x = p.getSlow(pid)
}
}
runtime_procUnpin() // 取消 G 和 P 的强绑定,其他 G 可以抢占当前 P 了
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New() // 所有 P 上都没有数据,新生成一个
}
return x
}
func (c *poolChain) popHead() (interface{}, bool) {
// 遍历链表,从每个链表元素里的 ring buffer 取数据
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// 获取链表下一个元素
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
Ring buffer
type poolDequeue struct {
// head 和 tail 放到同一个字段里,各占 32 bit,操作的时候通过原子操作读写
headTail uint64
// 每个 ring buffer 一旦生成,长度固定,不会扩容,是 sync.Pool 的底层存储
vals []eface
}
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1 dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1