概述
除标准库中提供的同步原语外,Go
语言还在子仓库sync
中提供了4种扩展原语:
golang/sync/errgroup.Group
golang/sync/semaphore.Weighted
golang/sync/singleflight.Group
golang/sync/syncmap.Map
其中 golang/sync/syncmap.Map
在 Go1.9
中移植到了标准库中。
接下来介绍Go
语言在扩展包中提供的3种同步原语 —— golang/sync/errgroup.Group
ErrGroup
golang/sync/errgroup.Group
为我们在一组 Goroutine
中提供了同步、错误传播以及上下文取消功能,我们可以使用这种方式并行获取网页数据:
var g errgroup.Group
var urls = []string{
"http://www.golang.org",
"http://www.google.com"
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
golang/sync/errgroup.Group.Go
方法能够创建一个 Goroutine
并在其中执行传入的函数,而 golang/sync/errgroup.Group.Wait
会等待所有 Goroutine
返回,该方法的不同返回结果有不同的含义:
- 如果返回错误 —— 这一组
Goroutine
最少返回一个错误 - 如果返回空值 —— 所有
Goroutine
都成功执行
结构体
golang/sync/errgroup.Group
结构体有 3
个比较重要的部分组成
cancel
—— 创建context.Context
时返回的取消函数,用于在多个Goroutine
之间同步取消信号wg
—— 用于等待一组Goroutine
完成子任务的同步原语errOnce
—— 用于保证只接收一个子任务
返回的错误
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
这些字段共同组成了 golang/sync/errgroup.Group
结构体并为我提供同步、错误传播以及上下文取消等功能。
接口
我们能通过 golang/sync/errgroup.WithContext
构造器创新的 golang/sync/errgroup.Group
结构体:
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
运行新的并行子任务需要使用 golang/sync/errgroup.Group.Go
方法,这个方法的执行过程如下:
sync.WaitGroup.Add
增加待处理的任务Goroutine
并运行子任务cancel
并对 err
赋值,只有最早返回的错误才会被上游感知到,后续错误都会被舍弃。func (g *Group) Go(){
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err := nil {
g.errOnce.Do(func () {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
另一个用于等待的 golang/sync/errgroup.Group.Wait
方法只是调用了 sync.WaitGroup.Wait
,在子任务全部完成时取消 context.Context 并返回可能出现的错误。
小结
golang/sync/errgroup.Group
的实现没有涉及底层和运行包中的API
,它只是封装了基本同步语义以提供更加复杂的功能。我们在使用它时需要注意两个问题:
golang/sync/errgroup.Group
在出现错误或者等待结束后,会调用context.Context
的cancel
方法同步取消信号- 只有第一个出现的错误才会被返回,剩余错误会被直接丢弃