WaitGroup 概述
WaitGroup
在go语言中,用于线程同步,单从字面意思理解,wait
等待的意思,group
组、团队的意思,WaitGroup
就是指等待一组,等待一个系列执行完成后才会继续向下执行。
WatiGroup
是sync
包中的一个struct
类型,用来收集需要等待执行完成的goroutine
。下面是它的定义:
// WaitGroup用于等待一组线程的结束。
// 父线程调用Add方法来设定应等待的线程的数量。
// 每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。
type WaitGroup struct {
// 包含隐藏或非导出字段
}
// Add方法向内部计数加上delta,delta可以是负数;
// 如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。
// 注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。
// 一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。
func (wg *WaitGroup) Add(delta int)
// Done方法减少WaitGroup计数器的值,应在线程的最后执行。
func (wg *WaitGroup) Done()
// Wait方法阻塞直到WaitGroup计数器减为0。
func (wg *WaitGroup) Wait()
sync.WaitGroup 有 3 个方法
- Add():每次激活想要被等待完成的
goroutine
之前,先调用Add(),用来设置或添加要等待完成的goroutine
数量
例如Add(2) 或者两次调用Add(1) 都会设置等待计数器的值为2,表示要等待2个
goroutine
完成
-
Done():每次需要等待的
goroutine
在真正完成之前,应该调用该方法来人为表示goroutine
完成了,该方法会对等待计数器减1 -
Wait():在等待计数器减为0之前,Wait() 会一直阻塞当前的
goroutine
综上所述:
Add() 用来增加要等待的
goroutine
的数量Done() 用来表示
goroutine
已经完成了,减少一次计数器Wait() 用来等待所有需要等待的
goroutine
完成。
使用示例
一个常见的使用场景是:批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我们可以通过 sync.WaitGroup 将原本顺序执行的代码在多个Goroutine中并发,如下图:
结构体
sync.WaitGroup 结构体中只包含两个成员:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
-
noCopy —— 保证 sync.WaitGroup 不会被开发者通过再赋值的方式赋值
-
state1 —— 存储状态和信号量
sync.noCopy 是一个特殊的私有结构体,源码包中的分析器会在编译期间检查被复制的变量中是否含有 sync.noCopy 或者实现了 Lock 和 Unlock 方法,如果包含有该结构体或者实现了对应的方法,就会抛出错误:
func main(){
wg := sync.WaitGorup{}
yawg := wg
fmt.Println(wg, yawg)
}tGorp
$ go vet proc.go
./proc.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
除了 sync.noCopy,sync.WaitGroup 结构体中还包含一个总共占用 12 字节的数组,该数组会存储当前结构体的状态,在 64 位与 32 位机器中表现也不同,如下图:
sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能够帮我们从 state1 字段中取出对应的状态和信号量。
接口
sync.WaitGroup 对外总共暴露3个方法:
- sync.WaitGroup.Add
- sync.WaitGroup.Wait
- sync.WaitGroup.Done
因 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法中传入了 -1,因此咱们主要分析另外两个方法:sync.WaitGroup.Add 、sync.WaitGroup.Wait
func (wg *sync.WaitGroup) Add (delta int){
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta) > 32)
w := uint32(state)
if v 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的计数器 counter 。
虽然 sync.WaitGroup.Add 方法传入的参数可以为负数,但是计数器只能是非负数,一旦出现负数程序就会崩溃。
当调用的计数器归零,即所有任务都执行完成时,才会通过 runtime_Semrelease 唤醒处于等待状态的 Goroutine。
sync.WaitGroup 的另一个方法 sync.WaitGroup.Wait 会在计数器大于0 并且不存在等待的Goroutine时,调用runtime.sync_runtime_Semacquire 陷入睡眠状态:
func (wg *sync.WaitGroup) Wait () {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state + 1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is resused before previous Wait has returned")
}
return
}
}
}
当sync.WaitGroup的计数器归零时,陷入睡眠状态的Goroutine会被唤醒,sync.WaitGroup.Wait方法也会立刻放回。
小结
通过 sync.WaitGroup 的分析和研究,可以得出以下结论:
- sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能重新使用
- sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,我们可以通过 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负),快速将计数器归零以唤醒等待的Goroutine
- 可以同时有多个Goroutine等待当前的sync.WaitGroup计数器归零,这些Goroutine会被同时唤醒