-
孔通信是站着编码而穿长衫的唯一的人。他对人说话,总是满口编码哲学,教人半懂不懂的。因为他姓孔,别人便从描红纸上的“不要通过共享内存进行通信;通过通信共享内存”这半懂不懂的话里,替他取下一个绰号,叫作孔通信。孔通信一到店,他们就故意的高声嚷道,“你一定又给人家洗脑了!”孔通信睁大眼睛说,“你怎么这样凭空污人清白……”“什么清白?我前天亲眼见何家写的代码,if err != nil。”孔通信便涨红了脸,额上的青筋条条绽出,争辩道,“互相请教不能算洗……互相请教!……coder的事,能算洗么?”接连便是难懂的话,什么“少即是多”、“所见即所得”,什么“功能正交”之类,引得众人都哄笑起来:店内外充满了快活的空气。
-
在Go中进行网络编程是十分简单的。每当接收到一个新连接时,直接开一个协程去处理,这就是goroutine-per-connection模式,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。这就是大道至简,让我们来一步一步揭开他的面纱。
希望通过这篇文章,能为你解答以下问题
go通过不到40行代码即可实现一个echo-server,且性能也不算差,足够应付大多数场景
func main() {
ln, err := net.Listen( "tcp" , ":8888")
if err != nil {
panic(err)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
continue
}
go func(conn net.Conn) {
defer conn.Close()
var (
rn, wn int
buf [0x4000]byte // 16 KB buffer
)
for {
rn, err = conn.Read(buf[:])
if err != nil {
return
}
wn, err = conn.Write(buf[:rn])
if err != nil || wn != rn {
return
}
}
}(conn)
}
}
暂时无法在飞书文档外展示此内容
在深入源码之前,先来了解一下go原生网络模型在Linux平台使用的io模型——epoll。简单介绍一下它的API:
#include
int epoll_create(int size);
int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll_create和epoll_create1都是创建一个epoll实例并返回对应的fd
epoll_ctl则是注册想要监听fd等待的IO事件(EPOLLIN、EPOLLOUT)
epoll_wait会阻塞监听 epoll 实例上所有的fd的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了
linux环境下,go网络底层能力都是基于epoll封装的;从实现上来讲,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 挂起,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。
以下出现源码版本为go1.19 linux/amd64
netFD && poll.FD
netFD 是一个网络描述符,是go对于fd的高层级封装,包含一系列网络属性和poll.FD;poll.FD是go对fd的低层级封装,主要包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,读写操作都是通过它来实现的。
// Network file descriptor. type netFD struct {
pfd poll.FD
// immutable until Close family int
sotype int
isConnected bool // handshake completed or use of association with peer net string
laddr Addr
raddr Addr
}
// FD is a file descriptor. The net and os packages use this type as a // field of a larger type representing a network connection or OS file. type FD struct {
// Lock sysfd and serialize access to Read and Write methods. fdmu fdMutex
// System file descriptor. Immutable until Close. Sysfd int
// I/O poller. pd pollDesc
// Writev cache. iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed. csema uint32
// Non-zero if this file has been set to blocking mode. isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. IsStream bool
// Whether a zero byte read indicates EOF. This is false for a // message based socket connection. ZeroReadIsEOF bool
// Whether this is a file rather than a network socket. isFile bool
}
type pollDesc struct {
runtimeCtx uintptr
}
// Network poller descriptor. // // No heap pointers. // //go:notinheap type pollDesc struct {
link *pollDesc // in pollcache , protected by pollcache.lock fd uintptr // constant for pollDesc usage lifetime
// atomicInfo holds bits from closing, rd, and wd, // which are only ever written while holding the lock, // summarized for use by netpollcheckerr, // which cannot acquire the lock. // After writing these fields under lock in a way that // might change the summary, code must call publishInfo // before releasing the lock. // Code that changes fields and then calls netpollunblock // (while still holding the lock) must call publishInfo // before calling netpollunblock, because publishInfo is what // stops netpollblock from blocking anew // (by changing the result of netpollcheckerr). // atomicInfo also holds the eventErr bit, // recording whether a poll event on the fd got an error; // atomicInfo is the only source of truth for that bit. atomicInfo atomic.Uint32 // atomic pollInfo
// rg, wg are accessed atomically and hold g pointers. // (Using atomic.Uintptr here is similar to using guintptr elsewhere.) rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil
lock mutex // protects the following fields closing bool
user uint32 // user settable cookie rseq uintptr // protects from stale read timers rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline (a nanotime in the future, -1 when expired) wseq uintptr // protects from stale write timers wt timer // write deadline timer wd int64 // write deadline (a nanotime in the future, -1 when expired) self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg. }
type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable, // because we can get ready notification from epoll/kqueue // after the descriptor is closed/reused. // Stale notifications are detected using seq variable, // seq is incremented when deadlines are changed or descriptor is reused. }
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced // only from epoll/kqueue internals.
// 首次分配大约4kb的内存空间 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}
func (c *pollCache) free(pd *pollDesc) {
lock(&c.lock)
pd.link = c.first
c.first = pd
unlock(&c.lock)
}
Server Listen
暂时无法在飞书文档外展示此内容
调用 net.Listen 之后,底层会通过 Linux 的系统调用 socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD ,接着调用 netFD 的 listenStream 方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化,在首次创建时会通过runtime调用epoll的初始化:
// Listen announces on the local network address. // // See func Listen for a description of the network and address // parameters. func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer }
return l, nil
}
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
// socket returns a network file descriptor that is ready for // asynchronous I/O using the network poller. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 通过系统调用创建一个socket
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 初始化netFD基本属性
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// This function makes a network file descriptor for the // following applications: // // - An endpoint holder that opens a passive stream // connection, known as a stream listener // // - An endpoint holder that opens a destination-unspecific // datagram connection, known as a datagram listener // // - An endpoint holder that opens an active stream or a // destination-specific datagram connection, known as a // dialer // // - An endpoint holder that opens the other connection, such // as talking to the protocol stack inside the kernel // // For stream and datagram listeners, they will only require // named sockets, so we can assume that it's just a request // from stream or datagram listeners when laddr is not nil but // raddr is nil. Otherwise we assume it's just for dialers or // the other connection holders.
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
// Init initializes the FD. The Sysfd field should already be set. // This can be called multiple times on a single FD. // The net argument is a network name from the net package (e.g., "tcp"), // or "file". // Set pollable to true if fd should be managed by runtime netpoll. func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types. if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller, // assume we are using blocking mode. fd.isBlocking = 1
}
return err
}
func (pd *pollDesc) init(fd *FD) error {
// sync.Once保证只做一次初始化操作,runtime中只有一个epoll实例
serverInit.Do(runtime_pollServerInit)
// 把新创建的fd注册到epoll中
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
// 通过系统调用创建一个管道,负责和epoll进行通信
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.setEventErr(false)
pd.rseq++
pd.rg.Store(0)
pd.rd = 0
pd.wseq++
pd.wg.Store(0)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
Server Accept TCPConn
通过系统调用创建的fd添加到runtime中的epoll实例
暂时无法在飞书文档外展示此内容
// Accept implements the Accept method in the Listener interface; it // waits for the next call and returns a generic Conn. func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// 与Listen逻辑相同
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
// Accept wraps the accept network call. func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 因为socket设置的NONBLOCK,会立马返回结果
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果是返回EAGAIN的error,说明暂时还没有链接进来
// 会通过pollDesc的能力让当前的goroutine挂起
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue
}
return -1, nil, errcall, err
}
}
// Wrapper around the accept system call that marks the returned file // descriptor as nonblocking and close-on-exec. func accept(s int) (int, syscall.Sockaddr, string, error) {
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28 // kernel and on FreeBSD it was introduced in 10 kernel. If we // get an ENOSYS error on both Linux and FreeBSD, or EINVAL // error on Linux, fall back to using accept. switch err {
case nil:
return ns, sa, "", nil
default: // errors other than the ones listed return -1, sa, "accept4", err
case syscall.ENOSYS: // syscall missing case syscall.EINVAL: // some Linux use this instead of ENOSYS case syscall.EACCES: // some Linux use this instead of ENOSYS case syscall.EFAULT: // some Linux use this instead of ENOSYS }
// See ../syscall/exec_unix.go for description of ForkLock. // It is probably okay to hold the lock across syscall.Accept // because we have put fd.sysfd into non-blocking mode. // However, a call to the File method will put it back into // blocking mode. We can't take that risk, so no use of ForkLock here. ns, sa, err = AcceptFunc(s)
if err == nil {
syscall.CloseOnExec(ns)
}
if err != nil {
return -1, nil, "accept", err
}
if err = syscall.SetNonblock(ns, true); err != nil {
CloseFunc(ns)
return -1, nil, "setnonblock", err
}
return ns, sa, "", nil
}
Read && Write
通过上面的源码阅读,不难看出整个调用链就像套娃:Listener->netFD->poll.FD->systemCall;Read和Write操作也不例外,而且和Accept处理未来到的链接一样,当无数据可读可写时,也是通过pollDesc来挂起当前的goroutine
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
// poll_runtime_pollWait, which is internal/poll.runtime_pollWait, // waits for a descriptor to be ready for reading or writing, // according to mode, which is 'r' or 'w'. // This returns an error code; the codes are defined above. // //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO. if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. }
return pollNoError
}
// returns true if IO is ready, or false if timedout or closed // waitio - wait only for completed IO, ignore errors // Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc // can hold only a single waiting goroutine for each mode. func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// pollDesc中的rg和wg存放的是对应goroutine
// gopark完成就会把goroutine的数据结构存在rg或wg上
// rg 代表的是读操作;wg代表的是写操作
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait for {
// 如果当前的协程状态已经是pdReady了
// 说明io事件已经来了,可以直接返回 if gpp.CompareAndSwap(pdReady, 0) {
return true
}
// 如果没有达到pdReady状态,说明无事发生
// 尝试把当前协程置位pdWait状态,并退出循环
if gpp.CompareAndSwap(0, pdWait) {
break
}
// Double check that this isn't corrupt; otherwise we'd loop // forever. if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
// Puts the current goroutine into a waiting state and calls unlockf on the // system stack. // // If unlockf returns false, the goroutine is resumed. // // unlockf must not access this G's stack, as it may be moved between // the call to gopark and the call to unlockf. // // Note that because unlockf is called after putting the G into a waiting // state, the G may have already been readied by the time unlockf is called // unless there is external synchronization preventing the G from being // readied. If unlockf returns false, it must guarantee that the G cannot be // externally readied. // // Reason explains why the goroutine has been parked. It is displayed in stack // traces and heap dumps. Reasons should be unique and descriptive. Do not // re-use reasons, add new ones. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy }
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here. mcall(park_m)
}
// park continuation on g0. func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg(gp)
if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns. }
}
schedule()
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 把当前的 goroutine数据结构保存到 pollDesc 的 rg 或者 wg 指针里
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller. // The scheduler uses this to decide whether to block // waiting for the poller if there is nothing else to do. atomic.Xadd(&netpollWaiters, 1)
}
return r
}
net操作总结
runtime
代码过到这里,网络连接的创建和读写都已经梳理完成,现在只剩一个问题没有解决,谁来唤醒挂起的goroutine,唤醒逻辑又是如何,让我们接着往下探索。
唤醒挂起的goroutine依赖runtime.netpoll的执行,这意味着服务在运行的过程中netpoll一定会周期性的执行;go主要是通过runtime.sysmon系统监控线程和runtime.schedule()的循环调用实现的。
// Always runs without a P , so write barriers are not allowed. // //go:nowritebarrierrec func sysmon() {
...
// poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll))
// 如果两次唤醒时间间隔大于10ms就会执行一次netpoll
// sysmonNetpollThreshold = 10ms
if netpollinited() && lastpoll != 0 && lastpoll+sysmonNetpollThreshold < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() {
// Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1)
// 把当前获取的g列表放到全局的队列中或者当前的P上
injectglist(&list)
incidlelocked(1)
}
...
}
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() {
...
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available ...
}
// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from local or global queue, poll network. // tryWakeP indicates that the returned goroutine is not normal (GC worker, trace // reader) so the caller should try to wake a P. func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there are no waiters or a thread is blocked // in netpoll already. If there is any kind of logical race with that // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway.
// early return 如果当前有闲置的goroutine且存在io事件待执行就返回g if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
}
...
...
// Poll network until next timer. if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
// Refresh now. now = nanotime()
delay := int64(-1)
if pollUntil != 0 {
delay = pollUntil - now
if delay pollUntil {
netpollBreak()
}
}
stopm()
goto top
}
// netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay 0: block for up to that many nanoseconds func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer. // 1e9 ms == ~11.5 days. waitms = 1e9
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n 0 {
return gList{}
}
goto retry
}
var toRun gList
var pollers gQueue
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a // nonblocking poll. Only read the byte // if blocking. var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, &pollers, pd, mode)
}
}
toRun.pushAll(pollers)
return toRun
}
// netpollready is called by the platform-specific netpoll function. // It declares that the fd associated with pd is ready for I/O. // The toRun argument is used to build a list of goroutines to return // from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate // whether the fd is ready for reading or writing or both. // // This may run while the world is stopped, so write barriers are not allowed. // //go:nowritebarrier func netpollready(toRun *gList, pollers *gQueue, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
if rg.adaptPoller && pollers != nil {
pollers.push(rg)
} else {
toRun.push(rg)
}
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := gpp.Load()
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set pdReady for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil
}
var new uintptr
if ioready {
new = pdReady
}
if gpp.CompareAndSwap(old, new) {
if old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
暂时无法在飞书文档外展示此内容
go为了让我们方便的进行网络编程,runtime做了大量工作,牢牢掌控网络IO。
他真的,我哭死😭
在某些极端场景下,例如:一百万的连接,这种goroutine-per-connection的模式就至少要启动一百万个goroutine,这对资源的消耗也是极大的。针对不同的操作系统和不同的Go版本,一个goroutine所使用的最小的栈大小是2KB ~ 8 KB (go stack),如果在每个goroutine中在分配byte buffer用以从连接中读写数据,几十G的内存轻轻松松就分配出去了。
如果我们想控制协程数量,简单使用协程池是不能解决问题的,在网络阻塞读写操作时,协程池不会释放出来goroutine去Accept新的链接;这个问题如何解呢?go官方留了原生库的口子:
cs.opensource.google/go/x/sys/+/…:
下面是一个简单的例子
type epoll struct {
fd int
connections map[int]net.Conn
lock *sync.RWMutex
}
func MkEpoll() (*epoll, error) {
fd, err := unix.EpollCreate1(0)
if err != nil {
return nil, err
}
return &epoll{
fd: fd,
lock: &sync.RWMutex{},
connections: make(map[int]net.Conn),
}, nil
}
func (e *epoll) Add(conn net.Conn) error {
// Extract file descriptor associated with the connection
fd := socketFD(conn)
err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)})
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
e.connections[fd] = conn
if len(e.connections)%100 == 0 {
log.Printf("total number of connections: %v", len(e.connections))
}
return nil
}
func (e *epoll) Remove(conn net.Conn) error {
fd := socketFD(conn)
err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
delete(e.connections, fd)
if len(e.connections)%100 == 0 {
log.Printf("total number of connections: %v", len(e.connections))
}
return nil
}
func (e *epoll) Wait() ([]net.Conn, error) {
events := make([]unix.EpollEvent, 100)
retry:
n, err := unix.EpollWait(e.fd, events, 100)
if err != nil {
if err == unix.EINTR {
goto retry
}
return nil, err
}
e.lock.RLock()
defer e.lock.RUnlock()
var connections []net.Conn
for i := 0; i < n; i++ {
conn := e.connections[int(events[i].Fd)]
connections = append(connections, conn)
}
return connections, nil
}
func socketFD(conn net.Conn) int {
//tls := reflect.TypeOf(conn.UnderlyingConn()) == reflect.TypeOf(&tls.Conn{})
// Extract the file descriptor associated with the connection
//connVal := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn").Elem()
tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
//if tls {
// tcpConn = reflect.Indirect(tcpConn.Elem())
//}
fdVal := tcpConn.FieldByName("fd")
pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")
return int(pfdVal.FieldByName("Sysfd").Int())
}
func main() {
startEpoll()
}
func startEpoll() {
ln, err := reuseport.Listen("tcp", ":8972")
if err != nil {
panic(err)
}
epoller, err := MkEpoll()
if err != nil {
panic(err)
}
go start(epoller)
for {
conn, e := ln.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
if err := epoller.Add(conn); err != nil {
log.Printf("failed to add connection %v", err)
conn.Close()
}
}
}
func start(epoller *epoll) {
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
for _, conn := range connections {
if conn == nil {
break
}
time.Sleep(*iotime)
_, err = io.CopyN(conn, conn, 8)
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
}
opsRate.Mark(1)
}
}
}
开源世界的高性能网络库基本上都是按照上面这个路子来的,包括 evio、gnet、字节的netpoll,当然还有亿点点细节