Go并发编程 — I/O聚合优化(动画讲解)

2024年 3月 6日 11.6k 0

背景提要

在存储系统中,在确保功能不受损的前提下,尽量的减少读写I/O的次数是优化的一个重要方向,也就是聚合I/O的场景。读写操作虽然都有聚合I/O的需求,但各自的重点和实现方法却有所不同。接下来,我们将分别探讨读和写请求的聚合优化方法。

读请求的聚合

以读操作中,缓存优化是一种常见的优化手段。具体做法是将读取的数据存储在内存中,并通过一个唯一的Key来索引这些数据。当读请求来到时,如果该Key在缓存中没有命中,那么就需要从后端存储获取。用户请求直接穿透到后端存储,如果并发很大,这可能是一个很大的风险。

例如,对于 Key:“test”,如果缓存中没有相应的数据,并且突然出现大量并发读取请求,每个请求都会发现缓存未命中。如果这些请求全部直接访问后端存储,可能会给后端存储带来巨大压力。

为了应对这种情况,我们其实可以只允许一个读请求去后端读取数据,而其他并发请求则等待这个请求的结果。这就是读请求聚合的基本原理。

在Go语言中,可以使用singleflight 这类第三方库完成上述需求。singleflight的设计理念是“单一请求执行”,即针对同一个Key,在多个并发请求中只允许一个请求访问后端。

01 - 读请求聚合的使用姿势

下面是一个使用 singleflight 的示例,展现了如何通过传入特定的Key和闭包函数来聚合并发请求。

package main

import (
  // ...
 "golang.org/x/sync/singleflight"
)

func main() {
   var g singleflight.Group
   var wg sync.WaitGroup

   // 模拟多个 goroutine 并发请求相同的资源
   for i := 0; i < 5; i++ {
      wg.Add(1)
      go func(idx int) {
          defer wg.Done()
          v, err, shared := g.Do("objectkey", func() (interface{}, error) {
              fmt.Printf("协程ID:%v 正在执行...n", idx)
              time.Sleep(2 * time.Second)
              return "objectvalue", nil
          })
          if err != nil {
              log.Fatalf("err:%v", err)
          }
          fmt.Printf("协程ID:%v 请求结果: %v, 是否共享结果: %vn", idx, v, shared)
      }(i)
   }
   wg.Wait()
}

在这个例子中,多个Goroutine并发地请求Key为“objectkey”的资源。通过singleflight,我们确保只有一个Goroutine去执行实际的数据加载操作,而其他请求则等待这个操作的结果。接下来,我们将探讨 singleflight 的原理。

02 - singleflight的原理

singleflight 库提供了一个Group结构体,用于管理不同的请求,意图在内部实现聚合的效果。定义如下:

type Group struct {
   mu sync.Mutex       // 互斥锁,包含下面的映射表
   m  map[string]*call // 正在执行请求的映射表
}

Group结构的核心就是这个map结构。每个正在执行的请求被封装在 call 结构中,定义如下:

type call struct {
   wg sync.WaitGroup // 用于同步并发的请求
   val interface{}   // 用于存放执行的结果
   err error         // 存放执行的结果
   dups  int         // 用于计数聚合的请求
    // ...其他字段用于处理特殊情况和提高容错性
}

Group结构的Do方法实现了聚合去重的核心逻辑,代码实现如下所示:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.Lock()
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   // 用 map 结构,来判断是否已经有对应 Key 正在执行的请求
   if c, ok := g.m[key]; ok {
      c.dups++
      // 如果有对应 Key 的请求正在执行,那么等待结果即可。
      g.mu.Unlock()
      c.wg.Wait()
      // ...
      return c.val, c.err, true
   }
   // 创建一个代表执行请求的结构,和 Key 关联起来,存入map中
   c := new(call)
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()
   g.doCall(c, key, fn) // 真正执行请求
   return c.val, c.err, c.dups > 0
}

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    defer func() {
      // ...省略异常处理
      c.wg.Done()
    }()
    func() {
        // 真正执行请求
         c.val, c.err = fn()
    }()
    // ...
}

通过上述代码,singleflight的Group结构体利用map记录了正在执行的请求,关联了请求的Key和执行体。当新的请求到来时,先检查是否有相同Key的正在执行的请求,如果有,则等待起结果,从而避免重复执行相同的请求。

动画示意图:

图片图片

对于读操作,singleflight通过这种方式有效地减少了重复工作。然而,对于写操作,处理逻辑会有所不同,它需要额外的机制来保证数据落盘的时序。

写请求的聚合

我们先回忆一下写操作的姿势。首先通过Write系统调用来写入数据,默认情况下此时数据可能仅驻留在PageCache中,为了确保数据安全落盘,此时我们需要手动调用一次 Sync 系统调用。

然而,Sync操作的成本相当大,并且它除了数据,还会同步元数据等其他信息到磁盘上。对于性能影响巨大。并且,在机械盘的场景下,串行化的执行Sync是更好的实践。

因此,我们面临的一个问题是:如果在不牺牲数据安全性的前提下,能否减少Sync的次数呢?

对于同一个文件的写操作,合并Sync操作是可行的。

文件的Sync会将当前时刻文件在内存中的全部数据一次性同步到磁盘。无论之前执行过多少次Write调用,一次Sync就能全部刷盘。这正是聚合写请求以优化性能的关键所在。

01 - 写聚合的原理

假设对同一个文件写了三次数据,每一次都是Write+Sync的操作。那么在合适的时机,三次Sync调用可以优化成一次。如下图所示:

图片图片

请求 C 的 Sync 操作是在所有请求的 Write 之后才发起的,所以它必定能保证在此之前的所有变更的数据都安全落盘。这就是写操作聚合的根本原理。

接下来我们来思考两个问题。

问题一:有童鞋可能会问,读写聚合优化感觉有一点相似?那能否用 singleflight 聚合写操作呢?

例如,当并发调用 Sync 的时候,如果发现有正在执行的Sync,能否共享这次Sync请求呢?

答案是:不可以。使用singleflight来优化写无法保证数据的安全性。

我们必须要保证的是,Sync操作一定要在Write完成之后发起。只要两者存在并发的可能性,那么Sync就不能保证携带了这次Write操作的数据,也就无法保证安全性。

示意图:

图片图片

还是以上面的图为例来说明,当请求 B 完成 Write 操作后,看到请求 A 已经发起了 Sync 操作。此时它是无法判断请求 A 的 Sync 操作是否包含了请求 B 的数据。从图示我们也很清晰的看到,请求B的 Write 和请求 A 的 Sync 在时间上存在重叠。

因此,当Write完成后,如果发现有一个Sync正在执行,我们不能简单地复用这个Sync。我们需要启动一个新的Sync操作。

问题二:那么聚合的时机在哪里呢?

对于读请求的聚合,其时机相对直观:一旦发现有针对同一个 Key 的请求,就可以等待这次的结果并复用该结果。但写请求的聚合时机则不是,它的聚合时机是在等待中遇到“志同道合“的请求。

让我们通过一个具体例子来说明(注意,以下所有的请求都是针对相同的文件):

  • t0 时刻:A 执行了 Write,并尝试发起Sync,由于此时没有其他请求在执行,A 便执行真正的Sync操作。
  • t1 时刻:B 执行了 Write,发现已经有请求在Sync了(即A),因此进入等待状态,直到A完成。
  • t2 时刻:C 执行了 Write,发现已经有请求在Sync了(即A),因此进入等待状态,直到A完成。
  • t3 时刻:D 执行了 Write,发现已经有请求在Sync了(即A),因此进入等待状态,直到A完成。
  • t4 时刻:A 的Sync操作终于完成。A随即通知 B、C、D 三位,告知它们可以进行Sync请求了。
  • t5 时刻:从B、C、D中选择一个来执行一次Sync操作。假设B被选中,则C、D请求则等待B完成Sync即可。B发起的Sync操作一定包含了B,C,D三者写的数据,确保了安全性。
  • t6:B 的Sync操作完成,C、D被通知操作已完成。如此一来,B、C、D三者的数据都确保落盘。
  • 正如上述所演示,写操作的聚合是在等待前一次Sync操作完成期间收集到的请求。本来需要4次Sync操作,现在仅需2次Sync就可以确保数据的安全性。

    在高并发的场景下,这种聚合方式的效益尤为显著。下面,我们将探讨这种策略的具体代码实现。

    02 - 写聚合的代码实现

    实现写操作聚合的关键在于确保数据安全的时序前提下进行聚合。以下是一种典型和实现方式,它是对 sync.Cond 和 sync.Once 的巧妙应用。首先,我们定义一个负责聚合的结构体,如下:

    // SyncJob 用于管理一个文件的 Sync 任务
    type SyncJob struct {
       *sync.Cond                         // 聚合 Sync 的关键
       holding    int32                   // 记录聚合的个数
       lastErr    error                   // 记录执行 Sync 结果
       syncPoint  *sync.Once              // 确保同一时间只有一个 Sync 执行
       syncFunc   func(interface{}) error // 实际执行 Sync 的函数
    }
    
    // SyncJob 的构建函数
    func NewSyncJob(fn func(interface{}) error) *SyncJob {
       return &SyncJob{
          Cond:      sync.NewCond(&sync.Mutex{}),
          syncFunc:  fn,
          syncPoint: &sync.Once{},
       }
    }

    接下来,我们为 SyncJob 定义一个执行聚合的方法,如下:

    func (s *SyncJob) Do(job interface{}) error {
     s.L.Lock()
     if s.holding > 0 {
      // 如果有请求在前面,则等待前一次请求完成。
        // 等待的过程中,会有"志同道合"之人
      s.Wait()
     }
     // 准备要下发请求了,增加计数
     s.holding += 1
     syncPoint := s.syncPoint
     s.L.Unlock()
    
     // "志同道合"的人一起来到这里,此时已经满足 Write 和 Sync 的时序关系。
      // 使用 sync.Once 确保只有请求者执行同步操作。
     syncPoint.Do(func() {
      // 执行实际的 Sync 操作
      s.lastErr = s.syncFunc(job)
    
      s.L.Lock()
        // holding 展示本批次有多少个请求
        fmt.Printf("holding:%vn", s.holding)
      // 本次请求执行完成,重置计数器,准备下一轮聚合
      s.holding = 0
      s.syncPoint = &sync.Once{}
      // 唤醒下一批的请求
      s.Broadcast()
      s.L.Unlock()
     })
     return s.lastErr
    }

    在这里,我们使用了一个Go的 sync.Cond 来阻塞和通知等待中的请求,并通过 sync.Once 确保同步操作同一时间、同一批只有一个在执行。

    • 其实在这个场景下,从代码实现来讲,sync.Cond 也可以使用 Go 的 Channel 来实现相同的效果,用 Ch← 来阻塞,用 close(Ch) 来通知。效果是一样的,感兴趣的童鞋可以改造试试。

    现在让我们来看看这段代码的实际运行效果:

    func main() {
     file, err := os.OpenFile("hello.txt", os.O_RDWR, 0700)
     if err != nil {
      log.Fatal(err)
     }
     defer file.Close()
    
     // 初始化 Sync 聚合服务
     syncJob := NewSyncJob(func(interface{}) error {
      fmt.Printf("do sync...n")
        time.Sleep(time.Second())
      return file.Sync()
     })
    
     wg := sync.WaitGroup{}
     for i := 0; i < 10; i++ {
      wg.Add(1)
      go func() {
       defer wg.Done()
       // 执行写操作 write ...
       fmt.Printf("write...n")
       // 触发 sync 操作
       syncJob.Do(file)
      }()
     }
     wg.Wait()
    }

    通过上述代码,我们讲对文件写入操作后的 Sync 调用进行有效的聚合。童鞋们可以多次运行程序,观察其行为。可以通过观察打印的 holding 字段获悉每一批聚合的请求是多少个。

    思考:从效果来讲,上面的代码无论怎么跑,最少要执行两次 Sync。你知道是为什么吗?

    动画示意图:

    图片图片

    总结

    上面介绍了读写聚合优化的两种实现。读和写的聚合是有区别的。

  • 读操作,核心是一个 map,只要有相同Key的读取正在执行,那么等待这份正在执行的请求的结果也是符合预期的。同步等待则用的是 sync.WaitGroup 来实现。
  • 写操作,核心是要先保证数据安全性。它必须保证 Sync 操作在 Write 操作之后。因此当发现有正在执行的Sync操作,那么就等待这次完成,然后必须重新开启一轮的 Sync 操作,等待的过程也是聚合的时机。我们可以使用 sync.Cond(或者 Channel )来实现阻塞和唤醒,使用 sync.Once 来保证同一时间单个执行。
  • 相关文章

    如何删除WordPress中的所有评论
    检查WordPress服务器磁盘使用情况的7种简便方法(查找大文件和数据)
    如何更改WordPress常量FS_METHOD
    如何为区块编辑器、Elementor等构建WordPress文章模板
    如何彻底地删除WordPress主题及相关内容
    如何使用WordPress搭建一个内网

    发布评论