举一反 N,解读 etcd watch 源码实现

2023年 7月 15日 59.8k 0

etcd 内置了 Watcher 机制,允许应用程序监视 etcd 中存储的键值对的更改,并在发生更改时接收通知。这种监视模式是实现分布式系统中的自动化和协同操作的重要组成部分。通过 Watcher 机制,应用程序可以实现很多功能,如实时通知、动态配置、负载均衡等。

一个最基础的 watch 命令如下,client1 开启一个 watch,监听一个名为 stock 的 key,client2 对 store 进行对 key put 操作,client1 会收到 key 上产生的 put 事件:

# client1
etcdctl watch stock
# ...等待事件到来

# client2
etcdctl put stock 100

# client1
etcdctl watch stock
# 收到 PUT 事件
PUT
stock
100

下面围绕这个基础例子看看 Watch 是如何实现的。

Client 处理

在分析客户端实现之前,首先需要的是,客户端与服务端通信使用了 gRPC 的双向流模式 ,这种模式支持 gRPC 客户端与服务端的双向读写,一个伪代码例子如下:

// 调用 gRPC 方法获取一个双向的流
stream, err := client.GetAStream(context.Background())
waitc := make(chan struct{})
go func() {
    for {
        // 从流中读取数据
        in, err := stream.Recv()
        if err == io.EOF {
          close(waitc)
          return
        }
        log.Printf("Got message %s", in)
    }
}()
for _, note := range notes {
	// 写数据到流中
    if err := stream.Send(note); err != nil {
        log.Fatalf("Failed to send a note: %v", err)
    }
}
stream.CloseSend()
 c.Watch() 函数来获取一个接收 Watch 结果的 channel,然后读取 channel 中的结果,并将 watch 产生的事件输出。

// file: etcdctl/ctlv3/command/watch_command.go
func watchCommandFunc(cmd *cobra.Command, args []string) {
    // 创建客户端
    c := mustClientFromCmd(cmd)
    // 获取 Watch 结果的 Channel
    wc, err := getWatchChan(c, watchArgs)

    // 读取 channel 并输出结果
    printWatchCh(c, wc, execArgs)
    ...
}

func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
    ...
    return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
}

Watch() 在一个给定的 key 上进行 watch 操作,返回一个 WatchChan 用于接收事件通知。在一个主循环中,先获取一个锁,然后检查当前是否已经有一个双向流与给定的 ctx 相关联。如果没有,就调用 newWatcherGrpcStream() 函数创建一个新的流,并将其与 ctx 相关联,随后将请求提交并等待接收 WatchChan 结果的 channel,收到结果后函数返回。在 etcdctl 中,返回后的结果也就被上面的printWatchCh() 读取并输出结果。

// file: client/v3/watch.go
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// watch 请求
wr := &watchRequest{...}
...
for {
w.mu.Lock()
...
wgs := w.streams[ctxKey]
if wgs == nil {
// 为 watcher 创建一个 gRPC 双向流
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
// 用于接收已就绪的请求
reqc := wgs.reqc
w.mu.Unlock()

select {
// 提交请求
case reqc

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论