Go 并发编程中的经验教训

2024年 7月 18日 45.3k 0

通过学习如何定位并发处理的陷阱来避免未来处理这些问题时的困境。

Go 并发编程中的经验教训-1

在复杂的分布式系统进行任务处理时,你通常会需要进行并发的操作。在 Mode.net 公司,我们每天都要和实时、快速和灵活的软件打交道。而没有一个高度并发的系统,就不可能构建一个毫秒级的动态地路由数据包的全球专用网络。这个动态路由是基于网络状态的,尽管这个过程需要考虑众多因素,但我们的重点是链路指标。在我们的环境中,链路指标可以是任何跟网络链接的状态和当前属性(如链接延迟)有关的任何内容。

并发探测链接监控

我们的动态路由算法 H.A.L.O.( 逐跳自适应链路状态最佳路由 Hop-by-Hop Adaptive Link-State Optimal Routing )部分依赖于链路指标来计算路由表。这些指标由位于每个 PoP( 存活节点 Point of Presence )上的独立组件收集。PoP 是表示我们的网络中单个路由实体的机器,通过链路连接并分布在我们的网络拓扑中的各个位置。某个组件使用网络数据包探测周围的机器,周围的机器回复数据包给前者。从接收到的探测包中可以获得链路延迟。由于每个 PoP 都有不止一个临近节点,所以这种探测任务实质上是并发的:我们需要实时测量每个临近连接点的延迟。我们不能串行地处理;为了计算这个指标,必须尽快处理每个探测。

Go 并发编程中的经验教训-2

序列号和重置:一个重新排列场景

我们的探测组件互相发送和接收数据包,并依靠序列号进行数据包处理。这旨在避免处理重复的包或顺序被打乱的包。我们的第一个实现依靠特殊的序列号 0 来重置序列号。这个数字仅在组件初始化时使用。主要的问题是我们考虑了递增的序列号总是从 0 开始。在该组件重启后,包的顺序可能会重新排列,某个包的序列号可能会轻易地被替换成重置之前使用过的值。这意味着,后继的包都会被忽略掉,直到排到重置之前用到的序列值。

UDP 握手和有限状态机

这里的问题是该组件重启前后的序列号是否一致。有几种方法可以解决这个问题,经过讨论,我们选择了实现一个带有清晰状态定义的三步握手协议。这个握手过程在初始化时通过链接建立会话。这样可以确保节点通过同一个会话进行通信且使用了适当的序列号。

为了正确实现这个过程,我们必须定义一个有清晰状态和过渡的有限状态机。这样我们就可以正确管理握手过程中的所有极端情况。

Go 并发编程中的经验教训-3

会话 ID 由握手的初始化程序生成。一个完整的交换顺序如下:

  • 发送者发送一个 SYN(ID) 数据包。
  • 接收者存储接收到的 ID 并发送一个 SYN-ACK(ID)
  • 发送者接收到 SYN-ACK(ID) 并发送一个 ACK(ID)。它还发送一个从序列号 0 开始的数据包。
  • 接收者检查最后接收到的 ID,如果 ID 匹配,则接受 ACK(ID)。它还开始接受序列号为 0 的数据包。
  • 处理状态超时

    基本上,每种状态下你都需要处理最多三种类型的事件:链接事件、数据包事件和超时事件。这些事件会并发地出现,因此你必须正确处理并发。

    • 链接事件包括网络连接或网络断开的变化,相应的初始化一个链接会话或断开一个已建立的会话。
    • 数据包事件是控制数据包(SYN/SYN-ACK/ACK)或只是探测响应。
    • 超时事件在当前会话状态的预定超时时间到期后触发。

    这里面临的最主要的问题是如何处理并发的超时到期和其他事件。这里很容易陷入死锁和资源竞争的陷阱。

    第一种方法

    本项目使用的语言是 Golang。它确实提供了原生的同步机制,如自带的通道和锁,并且能够使用轻量级线程来进行并发处理。

    Go 并发编程中的经验教训-4

    gopher 们聚众狂欢

    首先,你可以设计两个分别表示我们的会话和超时处理程序的结构体。

    type Session struct {  
      State SessionState  
      Id SessionId  
      RemoteIp string  
    }
    
    type TimeoutHandler struct {  
      callback func(Session)  
      session Session  
      duration int  
      timer *timer.Timer  
    }
    

    Session 标识连接会话,内有表示会话 ID、临近的连接点的 IP 和当前会话状态的字段。

    TimeoutHandler 包含回调函数、对应的会话、持续时间和指向调度计时器的指针。

    每一个临近连接点的会话都包含一个保存调度 TimeoutHandler 的全局映射。

    SessionTimeout map[Session]*TimeoutHandler
    

    下面方法注册和取消超时:

    // schedules the timeout callback function.  
    func (timeout* TimeoutHandler) Register() {  
      timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {  
        timeout.callback(timeout.session)  
      })  
    }
    
    func (timeout* TimeoutHandler) Cancel() {  
      if timeout.timer == nil {  
        return  
      }  
      timeout.timer.Stop()  
    }
    

    你可以使用类似下面的方法来创建和存储超时:

    func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {  
      if sessionTimeout[session] == nil {  
        sessionTimeout[session] := new(TimeoutHandler)  
      }  
       
      timeout = sessionTimeout[session]  
      timeout.session = session  
      timeout.callback = callback  
      timeout.duration = duration  
      return timeout  
    }
    

    超时处理程序创建后,会在经过了设置的 duration 时间(秒)后执行回调函数。然而,有些事件会使你重新调度一个超时处理程序(与 SYN 状态时的处理一样,每 3 秒一次)。

    为此,你可以让回调函数重新调度一次超时:

    func synCallback(session Session) {  
      sendSynPacket(session)
    
      // reschedules the same callback.  
      newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
      newTimeout.Register()
    
      sessionTimeout[state] = newTimeout  
    }
    

    这次回调在新的超时处理程序中重新调度自己,并更新全局映射 sessionTimeout

    数据竞争和引用

    你的解决方案已经有了。可以通过检查计时器到期后超时回调是否执行来进行一个简单的测试。为此,注册一个超时,休眠 duration 秒,然后检查是否执行了回调的处理。执行这个测试后,最好取消预定的超时时间(因为它会重新调度),这样才不会在下次测试时产生副作用。

    令人惊讶的是,这个简单的测试发现了这个解决方案中的一个问题。使用 cancel 方法来取消超时并没有正确处理。以下顺序的事件会导致数据资源竞争:

  • 你有一个已调度的超时处理程序。
  • 线程 1:
  • 你接收到一个控制数据包,现在你要取消已注册的超时并切换到下一个会话状态(如发送 SYN 后接收到一个 SYN-ACK
  • 你调用了 timeout.Cancel(),这个函数调用了 timer.Stop()。(请注意,Golang 计时器的停止不会终止一个已过期的计时器。)
  • 线程 2:
  • 在取消调用之前,计时器已过期,回调即将执行。
  • 执行回调,它调度一次新的超时并更新全局映射。
  • 线程 1:
  • 切换到新的会话状态并注册新的超时,更新全局映射。
  • 两个线程并发地更新超时映射。最终结果是你无法取消注册的超时,然后你也会丢失对线程 2 重新调度的超时的引用。这导致处理程序在一段时间内持续执行和重新调度,出现非预期行为。

    锁也解决不了问题

    使用锁也不能完全解决问题。如果你在处理所有事件和执行回调之前加锁,它仍然不能阻止一个过期的回调运行:

    func (timeout* TimeoutHandler) Register() {  
      timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {  
        stateLock.Lock()  
        defer stateLock.Unlock()
    
        timeout.callback(timeout.session)  
      })  
    }
    

    现在的区别就是全局映射的更新是同步的,但是这还是不能阻止在你调用 timeout.Cancel() 后回调的执行 —— 这种情况出现在调度计时器过期了但是还没有拿到锁的时候。你还是会丢失一个已注册的超时的引用。

    使用取消通道

    你可以使用取消通道,而不必依赖不能阻止到期的计时器执行的 golang 函数 timer.Stop()

    这是一个略有不同的方法。现在你可以不用再通过回调进行递归地重新调度;而是注册一个死循环,这个循环接收到取消信号或超时事件时终止。

    新的 Register() 产生一个新的 go 线程,这个线程在超时后执行你的回调,并在前一个超时执行后调度新的超时。返回给调用方一个取消通道,用来控制循环的终止。

    func (timeout *TimeoutHandler) Register() chan struct{} {
    cancelChan := make(chan struct{})

    go func () {
    select {
    case _ =

    相关文章

    Linux 命令行的聊天工具 CenterIM
    Linux 桌面年仍未到来 但 Linux 移动之年已到来
    12 个在线学习 Linux 技能网站
    Linux Mint : 会是另一个新的 Ubuntu 吗?
    W3Conf 开发者大会将于下周召开
    Ubuntu 10.04 ARM 处理器上网本版本结束服务期

    发布评论