在 etcd 中,事务是一组原子性操作,可以确保多个操作之间的原子性,并且可以确保一组操作在执行期间不会被其他操作中断。
下面是一个最简单的事务示例,txn
表示开启一个事务,在 compares
中,输入事务的执行条件,即 user1 = "bad"
,如果满足条件,则删除 user1
,否则将 user1
设置为 "good"
。
etcdctl txn --interactive
compares:
value("user1") = "bad"
success requests (get, put, delete):
del user1
failure requests (get, put, delete):
put user1 good
Client 处理
根据 etcdctl 的处理我们知道,txn
命令对应的处理函数是 txnCommandFunc()
,主要是将命令行输入转为 etcd 对应的操作方法并将事务请求发送给 Server。先看看大致流程:
与之前一样,txnCommandFunc()
中也是调用了 mustClientFromCmd()
将命令转为 Client
然后执行。
// file: etcdctl/ctlv3/command/txn_command.go
func txnCommandFunc(cmd *cobra.Command, args []string) {
...
// 用于读取命令行输入
reader := bufio.NewReader(os.Stdin)
// 创建一个事务
txn := mustClientFromCmd(cmd).Txn(context.Background())
promptInteractive("compares:")
// If 分支(compares)
txn.If(readCompares(reader)...)
// 打印到标准输出
promptInteractive("success requests (get, put, del):")
// Then 分支(success)
txn.Then(readOps(reader)...)
promptInteractive("failure requests (get, put, del):")
// Else 分支(failure)
txn.Else(readOps(reader)...)
// 提交事务到 Server
resp, err := txn.Commit()
...
}
客户端的 Txn()
方法创建了一个事务 txn
实例,实现了 Txn
接口。它包含了一套事务的完整方法。对于我们的示例来说,txn
结构体有三个主要成员:comps
、sus
和 fas
,分别对应命令行中的 compares
、success
、failure
。
// file: client/v3/txn.go
type Txn interface {
If(cs ...Cmp) Txn
Then(ops ...Op) Txn
Else(ops ...Op) Txn
Commit() (*TxnResponse, error)
}
type txn struct {
...
cmps []*pb.Compare // compares
sus []*pb.RequestOp // success requests
fas []*pb.RequestOp // failure requests
...
}
// file: client/v3/kv.go
func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{
kv: kv,
ctx: ctx,
callOpts: kv.callOpts,
}
}
If
分支中的 readCompares()
用于解析命令行中的 compares
,将每一行的输入调用 ParseCompare()
转成一个 clientv3.Cmp
结构列表,Then
和 Else
分支则使用 readOps()
生成 clientv3.Op
结构列表。三个分支的核心方法就是将转换后的操作对象赋给自己对应的成员数组。
// file: client/v3/txn.go
func (txn *txn) If(cs ...Cmp) Txn {
...
for i := range cs {
txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
}
}
func (txn *txn) Then(ops ...Op) Txn {
...
for _, op := range ops {
txn.sus = append(txn.sus, op.toRequestOp())
}
}
func (txn *txn) Else(ops ...Op) Txn {
...
for _, op := range ops {
txn.fas = append(txn.fas, op.toRequestOp())
}
}
完成分支操作的处理后,会调用 txn.Commit()
发送 gRPC 请求将事务提交到服务端。
func (txn *txn) Commit() (*TxnResponse, error) {
...
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
var resp *pb.TxnResponse
var err error
// 调用 KVClient 的 Txn 方法发送 gRPC 请求
resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
if err != nil {
return nil, toErr(txn.ctx, err)
}
return (*TxnResponse)(resp), nil
}
Server 处理
EtcdServer
实现了 KVServer
的 Txn
方法,跟 Put 操作 一样,最终会发送一个 raft 请求等待其它节点完成响应并返回结果。
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
...
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
// 发送 Raft 请求
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
}
return resp.(*pb.TxnResponse), nil
}
Raft 节点收到的请求之后,会交由运行 etcd 服务时的调度器执行。
// file: server/etcdserver/server.go
func (s *EtcdServer) run() {
...
for {
select {
case ap :=