Do not communicate by sharing memory; instead, share memory by communicating.
通过通信来共享内存,而不是共享内存来通信
安全访问共享变量是并发编程的一个难点,在 Golang 语言中,倡导通过通信共享内存,实际上就是使用 channel 传递共享变量,在任何给定时间,只有一个 goroutine 可以访问该变量的值,从而避免发生数据竞争。
本文关键是对Channel 实现原理进行分析,并附带源码解读,基于源码分析能更加理解Channel实现的过程与原因,对于源码关键步骤及变量给出了注释,不需要完全读懂源码的每个变量及函数,但可以从代码的异常处理角度来理解Channel,就能明白为什么channel的创建、写入、读取、关闭等流程需要分为多种情况。
1.Channel 数据结构
1.1 hchan结构体
读 channel 的源码,可以发现 channel 的数据结构是 hchan 结构体,包含以下字段,每个字段的含义已注释:
type hchan struct {
qcount uint // 当前 channel 中存在多少个元素;
dataqsiz uint // 当前 channel 能存放的元素容量;
buf unsafe.Pointer // channel 中用于存放元素的环形缓冲区;
elemsize uint16 //channel 元素类型的大小;
closed uint32 //标识 channel 是否关闭;
elemtype *_type // channel 元素类型;
sendx uint // 发送元素进入环形缓冲区的 index;
recvx uint // 接收元素所处的环形缓冲区的 index;
recvq waitq // 因接收而陷入阻塞的协程队列;
sendq waitq // 因发送而陷入阻塞的协程队列;
lock mutex //互斥锁,保证同一时间只有一个协程读写 channel
}
通过阅读 channel 的数据结构,可以发现 channel 是使用环形队列作为 channel 的缓冲区,datasize 环形队列的长度是在创建 channel 时指定的,通过 sendx 和 recvx 两个字段分别表示环形队列的队尾和队首,其中,sendx 表示数据写入的位置,recvx 表示数据读取的位置。
字段 recvq 和 sendq 分别表示等待接收的协程队列和等待发送的协程队列,当 channel 缓冲区为空或无缓冲区时,当前协程会被阻塞,分别加入到 recvq 和 sendq 协程队列中,等待其它协程操作 channel 时被唤醒。其中,读阻塞的协程被写协程唤醒,写阻塞的协程被读协程唤醒。
字段 elemtype 和 elemsize 表示 channel 中元素的类型和大小,需要注意的是,一个 channel 只能传递一种类型的值,如果需要传递任意类型的数据,可以使用 interface{} 类型。
字段 lock 是保证同一时间只有一个协程读写 channel。
1.2 阻塞协程队列waitq与sudog结构体
在hchan中我们可以看到 recvq与sendq都是waitq类型,这代表协程等待队列。这个队列维护阻塞在一个channel上的所有协程。first和last是指向sudog结构体类型的指针,表示队列的头和尾。waitq里面连接的是一个sudog双向链表,保存的是等待的goroutine。队列中的sudog也是一个结构体,代表一个协程/sync.Mutex等待队列中的节点,包含了协程和数据的信息。waitq与sudog结构体包含以下字段,每个字段的含义已注释:
type waitq struct { //阻塞的协程队列
first *sudog //队列头部
last *sudog //队列尾部
}
type sudog struct { //sudog:包装协程的节点
g *g //goroutine,协程;
next *sudog //队列中的下一个节点;
prev *sudog //队列中的前一个节点;
elem unsafe.Pointer //读取/写入 channel 的数据的容器;
isSelect bool //标识当前协程是否处在 select 多路复用的流程中;
c *hchan //标识与当前 sudog 交互的 chan.
}
2.Channel构造器函数
2.1 Channel常见类型
-
无缓冲型Channel:常用于同步的场景,比如协调两个或多个并发goroutine之间的执行,传递临界资源等。
-
有缓冲的 struct 型Channel:常用于单向传输数据流,例如将producer和consumer分开,这样可以避免不必要的等待时间。
-
有缓冲的 pointer 型Channel:有缓冲的 pointer 型Channel位于管道中的元素是指针类型的变量。它常被用于异步数据传输,将消费者的读取数据和生产者的填充数据分离。
2.2 Channel构造器函数源码分析
func makechan(t *chantype, size int) *hchan {
elem := t.elem //Channel中元素类型
// 每个元素的内存大小为elem.size,channel的容量为size,计算出总内存mem
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0: //无缓冲型Channel
//hchanSize默认为96
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 竞争检测器使用此位置进行同步。
c.buf = c.raceaddr()
case elem.ptrdata == 0: //有缓冲的 struct 型Channel
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: //有缓冲的 pointer 型Channel
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化hchan
c.elemsize = uint16(elem.size) //每个元素在内存中占用的字节数
c.elemtype = elem //元素类型
c.dataqsiz = uint(size) //队列中元素的数量上限
lockInit(&c.lock, lockRankHchan) //初始化读写保护锁
return c
}
这段代码的作用是创建一个 channel,并初始化 channel 中的各个字段。
t.elem.size
,channel的容量是size
,所需要分配的总内存大小为mem
。mem
的值判断是否需要分配内存:分为 无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel;
- 倘若为无缓冲型channel,则仅申请一个大小为默认值 hchanSize即96 的空间;
- 如若有缓冲的 struct 型channel,则一次性分配好 96 + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置;
- 倘若为有缓冲的 pointer 型channel,则分别申请 chan 和 buf 的空间,两者无需连续;
3.channel写操作实现原理
3.1 channel写异常处理
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// ...
- 对于未初始化即为空的 chan,写入操作会引发死锁“unreachable”;
- 对于已关闭的 chan,写入操作会引发 panic"send on closed channel";
3.2 channel写时存在阻塞读协程——此时环形缓冲区内元素个数为0
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock) // 加锁
// ...
//从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog
if sg := c.recvq.dequeue(); sg != nil {
//在 send 方法中,基于 memmove 方法,直接将元素拷贝交给 sudog 对应的读协程sg,并完成解锁动作
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// ...
写入前利用channel 的lock进行加锁,如果在channel写入时,如果 channel 中存在阻塞的读协程,那么此时channel内一定没有元素,于是将这个读携程 唤醒,并为了提高效率,直接将要发送的数据传递给它,而不需要存储到缓冲区中。
3.3channel写时无阻塞读协程且环形缓冲区仍有空间
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock) //加锁
// ...
if c.qcount < c.dataqsiz { //判断环形缓冲区是否有空间
qp := chanbuf(c, c.sendx) //将当前元素添加到环形缓冲区 sendx 对应的位置
//memmove(dst, src, t.size) 进行数据的转移,本质上是一个内存拷贝
//将发送的数据直接拷贝到 x = 0 {
// 获取到 recvx 对应位置的元素
qp := chanbuf(c, c.recvx)
if ep != nil {
//typedmemmove(dst, src, size):从 src 指向的地址复制 size 字节的数据到 dst 指向的地址。
//将channel缓冲区或发送队列中读取到的目标元素(即 qp 指针)写入到接收端点的目标内存地址(即 ep 指针)中
typedmemmove(c.elemtype, ep, qp)
}
//typedmemclr(ptr, size):从 ptr 开始的地址上清空 size 字节的数据,将要清空的内存空间设置成数据类型的零值。
//清空刚才从 channel 缓冲区或发送队列中取出的元素
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// ...
读时无阻塞写协程且缓冲区有元素,为一般情况,则直接读取环形缓冲区对应的元素
4.5 读时无阻塞写协程且缓冲区无元素
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
//加锁
lock(&c.lock)
// ...
//构造封装当前 goroutine 的 sudog 对象
gp := getg()
mysg := acquireSudog()
//完成指针指向,建立 sudog、goroutine、channel 之间的指向关系
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
gp.param = nil
//把 sudog 添加到当前 channel 的阻塞读协程队列中
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//park 挂起当前读协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
//倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被写入)
gp.waiting = nil
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
//解锁,返回
return true, success
}
读时无阻塞写协程且缓冲区无元素,那么直接通过 gopark 函数,将当前 goroutine 驱动进入休眠状态,等待其他 写goroutine push 数据、close channel 或者 delete 当前 goroutine 的唤醒,被唤醒后数据已被其他协程处理,故直接回收空间。
4.6 channel读流程总结
- 若有阻塞的写协程, 说明环形缓冲区为无缓冲型或已被写满,故判断channel是否为无缓冲型
- 若 channel为无缓冲型,则直接读取写协程元素,并唤醒写协程;
- 若 channel 为有缓冲型,则读取环形缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写协程,更新读写索引;
- 若没有阻塞的写协程,则判断环形缓冲区是否有空间
- 若环形缓冲区有空间,则直接将当前元素添加到环形缓冲区 sendx的位置,并更新写入位置sendx与通道元素个数qcount,解锁后返回函数。
- 若环形缓冲区无空间,将当前协程加入阻塞写协程队列中,阻塞协程,等待被读协程唤醒,并完成解锁
4.7 两种读 channel 的协议
读取 channel 时,我们会发现若通道关闭且无元素会返回零值,故我们需要判断进行读channel时是真的读到零值还是由于通道关闭读到零值,故源码中定义了两种读 channel 的协议。分别如下:
got1 :=