Golang Channel 实现原理与源码分析

2023年 8月 13日 116.4k 0

Do not communicate by sharing memory; instead, share memory by communicating.
通过通信来共享内存,而不是共享内存来通信

安全访问共享变量是并发编程的一个难点,在 Golang 语言中,倡导通过通信共享内存,实际上就是使用 channel 传递共享变量,在任何给定时间,只有一个 goroutine 可以访问该变量的值,从而避免发生数据竞争。
本文关键是对Channel 实现原理进行分析,并附带源码解读,基于源码分析能更加理解Channel实现的过程与原因,对于源码关键步骤及变量给出了注释,不需要完全读懂源码的每个变量及函数,但可以从代码的异常处理角度来理解Channel,就能明白为什么channel的创建、写入、读取、关闭等流程需要分为多种情况。

1.Channel 数据结构

1.1 hchan结构体

读 channel 的源码,可以发现 channel 的数据结构是 hchan 结构体,包含以下字段,每个字段的含义已注释:

type hchan struct {
 qcount   uint           // 当前 channel 中存在多少个元素;
 dataqsiz uint           // 当前 channel 能存放的元素容量;
 buf      unsafe.Pointer // channel 中用于存放元素的环形缓冲区;
 elemsize uint16        //channel 元素类型的大小;
 closed   uint32		//标识 channel 是否关闭;
 elemtype *_type 		// channel 元素类型;
 sendx    uint   		// 发送元素进入环形缓冲区的 index;
 recvx    uint   		// 接收元素所处的环形缓冲区的 index;
 recvq    waitq  		// 因接收而陷入阻塞的协程队列;
 sendq    waitq  		// 因发送而陷入阻塞的协程队列;
 lock mutex				//互斥锁,保证同一时间只有一个协程读写 channel
}

通过阅读 channel 的数据结构,可以发现 channel 是使用环形队列作为 channel 的缓冲区,datasize 环形队列的长度是在创建 channel 时指定的,通过 sendx 和 recvx 两个字段分别表示环形队列的队尾和队首,其中,sendx 表示数据写入的位置,recvx 表示数据读取的位置。

字段 recvq 和 sendq 分别表示等待接收的协程队列和等待发送的协程队列,当 channel 缓冲区为空或无缓冲区时,当前协程会被阻塞,分别加入到 recvq 和 sendq 协程队列中,等待其它协程操作 channel 时被唤醒。其中,读阻塞的协程被写协程唤醒,写阻塞的协程被读协程唤醒。

字段 elemtype 和 elemsize 表示 channel 中元素的类型和大小,需要注意的是,一个 channel 只能传递一种类型的值,如果需要传递任意类型的数据,可以使用 interface{} 类型。

字段 lock 是保证同一时间只有一个协程读写 channel。
在这里插入图片描述

1.2 阻塞协程队列waitq与sudog结构体

在hchan中我们可以看到 recvq与sendq都是waitq类型,这代表协程等待队列。这个队列维护阻塞在一个channel上的所有协程。first和last是指向sudog结构体类型的指针,表示队列的头和尾。waitq里面连接的是一个sudog双向链表,保存的是等待的goroutine。队列中的sudog也是一个结构体,代表一个协程/sync.Mutex等待队列中的节点,包含了协程和数据的信息。waitq与sudog结构体包含以下字段,每个字段的含义已注释:

type waitq struct {		//阻塞的协程队列
    first *sudog 		//队列头部
    last  *sudog		//队列尾部
}
type sudog struct {		//sudog:包装协程的节点
    g *g				//goroutine,协程;

    next *sudog			//队列中的下一个节点;
    prev *sudog			//队列中的前一个节点;
    elem unsafe.Pointer //读取/写入 channel 的数据的容器;
    
    isSelect bool		//标识当前协程是否处在 select 多路复用的流程中;
    
    c        *hchan 	//标识与当前 sudog 交互的 chan.
}

在这里插入图片描述

2.Channel构造器函数

2.1 Channel常见类型

  • 无缓冲型Channel:常用于同步的场景,比如协调两个或多个并发goroutine之间的执行,传递临界资源等。

  • 有缓冲的 struct 型Channel:常用于单向传输数据流,例如将producer和consumer分开,这样可以避免不必要的等待时间。

  • 有缓冲的 pointer 型Channel:有缓冲的 pointer 型Channel位于管道中的元素是指针类型的变量。它常被用于异步数据传输,将消费者的读取数据和生产者的填充数据分离。

2.2 Channel构造器函数源码分析

func makechan(t *chantype, size int) *hchan {
    elem := t.elem	//Channel中元素类型
    
    // 每个元素的内存大小为elem.size,channel的容量为size,计算出总内存mem
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:				//无缓冲型Channel
   		 //hchanSize默认为96
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // 竞争检测器使用此位置进行同步。
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:		//有缓冲的 struct 型Channel
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:					//有缓冲的 pointer 型Channel
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    // 初始化hchan
    c.elemsize = uint16(elem.size)		//每个元素在内存中占用的字节数
    c.elemtype = elem					//元素类型
    c.dataqsiz = uint(size)				//队列中元素的数量上限
    
    lockInit(&c.lock, lockRankHchan)	//初始化读写保护锁

    return c
}

这段代码的作用是创建一个 channel,并初始化 channel 中的各个字段。

  • 计算总内存大小:每个元素占用空间是t.elem.size,channel的容量是size,所需要分配的总内存大小为mem
  • 根据mem的值判断是否需要分配内存:分为 无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel;
    • 倘若为无缓冲型channel,则仅申请一个大小为默认值 hchanSize即96 的空间;
    • 如若有缓冲的 struct 型channel,则一次性分配好 96 + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置;
    • 倘若为有缓冲的 pointer 型channel,则分别申请 chan 和 buf 的空间,两者无需连续;
  • 初始化channel:设置elemsize, elemtype, dataqsiz, lock等字段。其中elemsize标识每个元素在内存中占用的字节数,elemType包含元素类型(reflect.Type),dataqsiz存放队列中元素的数量上限(若是无缓冲通道,则默认为1), lock压缩对chan的读写操作进行保护的锁。
  • 最后返回创建的channel的指针c。
  • 3.channel写操作实现原理

    3.1 channel写异常处理

    func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
    }
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        if c == nil {
            gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        lock(&c.lock)
    
        if c.closed != 0 {
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
        
        // ...
    
    • 对于未初始化即为空的 chan,写入操作会引发死锁“unreachable”;
    • 对于已关闭的 chan,写入操作会引发 panic"send on closed channel";

    3.2 channel写时存在阻塞读协程——此时环形缓冲区内元素个数为0

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // ...
    	
        lock(&c.lock)	// 加锁
    
        // ...
    	//从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog
        if sg := c.recvq.dequeue(); sg != nil {
    		//在 send 方法中,基于 memmove 方法,直接将元素拷贝交给 sudog 对应的读协程sg,并完成解锁动作
            send(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true
        }
        
        // ...
    

    写入前利用channel 的lock进行加锁,如果在channel写入时,如果 channel 中存在阻塞的读协程,那么此时channel内一定没有元素,于是将这个读携程 唤醒,并为了提高效率,直接将要发送的数据传递给它,而不需要存储到缓冲区中。
    在这里插入图片描述

    3.3channel写时无阻塞读协程且环形缓冲区仍有空间

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // ...
        lock(&c.lock)	//加锁
        // ...
        if c.qcount < c.dataqsiz {	//判断环形缓冲区是否有空间
            qp := chanbuf(c, c.sendx)	//将当前元素添加到环形缓冲区 sendx 对应的位置
            //memmove(dst, src, t.size) 进行数据的转移,本质上是一个内存拷贝
            //将发送的数据直接拷贝到 x =  0 {
            // 获取到 recvx 对应位置的元素
            qp := chanbuf(c, c.recvx)
            if ep != nil {
            	//typedmemmove(dst, src, size):从 src 指向的地址复制 size 字节的数据到 dst 指向的地址。
            	//将channel缓冲区或发送队列中读取到的目标元素(即 qp 指针)写入到接收端点的目标内存地址(即 ep 指针)中
                typedmemmove(c.elemtype, ep, qp)
            }
            //typedmemclr(ptr, size):从 ptr 开始的地址上清空 size 字节的数据,将要清空的内存空间设置成数据类型的零值。
            //清空刚才从 channel 缓冲区或发送队列中取出的元素
            typedmemclr(c.elemtype, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.qcount--
            unlock(&c.lock)
            return true, true
        }
        // ...
    

    读时无阻塞写协程且缓冲区有元素,为一般情况,则直接读取环形缓冲区对应的元素
    在这里插入图片描述

    4.5 读时无阻塞写协程且缓冲区无元素

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
       // ...
       //加锁
       lock(&c.lock)
       // ...
       //构造封装当前 goroutine 的 sudog 对象
        gp := getg()
        mysg := acquireSudog()
        //完成指针指向,建立 sudog、goroutine、channel 之间的指向关系
        mysg.elem = ep
        gp.waiting = mysg
        mysg.g = gp
        mysg.c = c
        gp.param = nil
        //把 sudog 添加到当前 channel 的阻塞读协程队列中
        c.recvq.enqueue(mysg)
        atomic.Store8(&gp.parkingOnChan, 1)
         //park 挂起当前读协程
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    	//倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被写入)
        gp.waiting = nil
        success := mysg.success
        gp.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        //解锁,返回
        return true, success
    }
    

    读时无阻塞写协程且缓冲区无元素,那么直接通过 gopark 函数,将当前 goroutine 驱动进入休眠状态,等待其他 写goroutine push 数据、close channel 或者 delete 当前 goroutine 的唤醒,被唤醒后数据已被其他协程处理,故直接回收空间。

    4.6 channel读流程总结

    在这里插入图片描述

  • 首先判断通道是否为nil即未初始化,若为空则引发死锁
  • 若通道非空,由于channel是共享资源,故需要对通道进行lock加锁
  • 继续判断通道是否关闭,若关闭,则判断环形缓冲区是否有元素,若无元素,则返回对应元素的零值。
  • 通道非空未关闭,则正式进入写入流程,首先判断是否有阻塞的写协程
    • 若有阻塞的写协程, 说明环形缓冲区为无缓冲型或已被写满,故判断channel是否为无缓冲型
      • 若 channel为无缓冲型,则直接读取写协程元素,并唤醒写协程;
      • 若 channel 为有缓冲型,则读取环形缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写协程,更新读写索引;
    • 若没有阻塞的写协程,则判断环形缓冲区是否有空间
      • 若环形缓冲区有空间,则直接将当前元素添加到环形缓冲区 sendx的位置,并更新写入位置sendx与通道元素个数qcount,解锁后返回函数。
      • 若环形缓冲区无空间,将当前协程加入阻塞写协程队列中,阻塞协程,等待被读协程唤醒,并完成解锁
  • 4.7 两种读 channel 的协议

    读取 channel 时,我们会发现若通道关闭且无元素会返回零值,故我们需要判断进行读channel时是真的读到零值还是由于通道关闭读到零值,故源码中定义了两种读 channel 的协议。分别如下:

    got1 :=

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论