etcd 内置了 Watcher 机制,允许应用程序监视 etcd 中存储的键值对的更改,并在发生更改时接收通知。这种监视模式是实现分布式系统中的自动化和协同操作的重要组成部分。通过 Watcher 机制,应用程序可以实现很多功能,如实时通知、动态配置、负载均衡等。
一个最基础的 watch 命令如下,client1 开启一个 watch,监听一个名为 stock 的 key,client2 对 store 进行对 key put 操作,client1 会收到 key 上产生的 put 事件:
# client1
etcdctl watch stock
# ...等待事件到来
# client2
etcdctl put stock 100
# client1
etcdctl watch stock
# 收到 PUT 事件
PUT
stock
100
下面围绕这个基础例子看看 Watch 是如何实现的。
Client 处理
在分析客户端实现之前,首先需要的是,客户端与服务端通信使用了 gRPC 的双向流模式 ,这种模式支持 gRPC 客户端与服务端的双向读写,一个伪代码例子如下:
// 调用 gRPC 方法获取一个双向的流
stream, err := client.GetAStream(context.Background())
waitc := make(chan struct{})
go func() {
for {
// 从流中读取数据
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
log.Printf("Got message %s", in)
}
}()
for _, note := range notes {
// 写数据到流中
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
c.Watch()
函数来获取一个接收 Watch 结果的 channel,然后读取 channel 中的结果,并将 watch 产生的事件输出。
// file: etcdctl/ctlv3/command/watch_command.go
func watchCommandFunc(cmd *cobra.Command, args []string) {
// 创建客户端
c := mustClientFromCmd(cmd)
// 获取 Watch 结果的 Channel
wc, err := getWatchChan(c, watchArgs)
// 读取 channel 并输出结果
printWatchCh(c, wc, execArgs)
...
}
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
...
return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
}
Watch()
在一个给定的 key 上进行 watch 操作,返回一个 WatchChan
用于接收事件通知。在一个主循环中,先获取一个锁,然后检查当前是否已经有一个双向流与给定的 ctx
相关联。如果没有,就调用 newWatcherGrpcStream()
函数创建一个新的流,并将其与 ctx
相关联,随后将请求提交并等待接收 WatchChan 结果的 channel,收到结果后函数返回。在 etcdctl 中,返回后的结果也就被上面的printWatchCh()
读取并输出结果。
// file: client/v3/watch.go
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// watch 请求
wr := &watchRequest{...}
...
for {
w.mu.Lock()
...
wgs := w.streams[ctxKey]
if wgs == nil {
// 为 watcher 创建一个 gRPC 双向流
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
// 用于接收已就绪的请求
reqc := wgs.reqc
w.mu.Unlock()
select {
// 提交请求
case reqc