从源码实现了解 etcd 事务

2023年 7月 14日 57.0k 0

在 etcd 中,事务是一组原子性操作,可以确保多个操作之间的原子性,并且可以确保一组操作在执行期间不会被其他操作中断。

下面是一个最简单的事务示例,txn 表示开启一个事务,在 compares 中,输入事务的执行条件,即 user1 = "bad",如果满足条件,则删除 user1,否则将 user1 设置为 "good"

etcdctl txn --interactive

compares:
value("user1") = "bad"

success requests (get, put, delete):
del user1

failure requests (get, put, delete):
put user1 good

Client 处理

根据 etcdctl 的处理我们知道,txn 命令对应的处理函数是 txnCommandFunc(),主要是将命令行输入转为 etcd 对应的操作方法并将事务请求发送给 Server。先看看大致流程:

transaction-client.png

与之前一样,txnCommandFunc() 中也是调用了 mustClientFromCmd() 将命令转为 Client 然后执行。

// file: etcdctl/ctlv3/command/txn_command.go
func txnCommandFunc(cmd *cobra.Command, args []string) {
    ...
    // 用于读取命令行输入
    reader := bufio.NewReader(os.Stdin)

    // 创建一个事务
    txn := mustClientFromCmd(cmd).Txn(context.Background())
    promptInteractive("compares:")
    // If 分支(compares)
    txn.If(readCompares(reader)...)
    // 打印到标准输出
    promptInteractive("success requests (get, put, del):")
    // Then 分支(success)
    txn.Then(readOps(reader)...)
    promptInteractive("failure requests (get, put, del):")
    // Else 分支(failure)
    txn.Else(readOps(reader)...)

    // 提交事务到 Server
    resp, err := txn.Commit()
    ...
}

客户端的 Txn() 方法创建了一个事务 txn 实例,实现了 Txn 接口。它包含了一套事务的完整方法。对于我们的示例来说,txn 结构体有三个主要成员:compssusfas,分别对应命令行中的 comparessuccessfailure

// file: client/v3/txn.go
type Txn interface {
    If(cs ...Cmp) Txn
    Then(ops ...Op) Txn
    Else(ops ...Op) Txn
    Commit() (*TxnResponse, error)
}

type txn struct {
    ...
    cmps []*pb.Compare // compares
    sus []*pb.RequestOp // success requests
    fas []*pb.RequestOp // failure requests
    ...
}

// file: client/v3/kv.go
func (kv *kv) Txn(ctx context.Context) Txn {
    return &txn{
        kv:       kv,
        ctx:      ctx,
        callOpts: kv.callOpts,
    }
}

If 分支中的 readCompares() 用于解析命令行中的 compares,将每一行的输入调用 ParseCompare() 转成一个 clientv3.Cmp 结构列表,ThenElse 分支则使用 readOps() 生成 clientv3.Op 结构列表。三个分支的核心方法就是将转换后的操作对象赋给自己对应的成员数组。

// file: client/v3/txn.go
func (txn *txn) If(cs ...Cmp) Txn {
    ...
    for i := range cs {
        txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
    }
}

func (txn *txn) Then(ops ...Op) Txn {
    ...
    for _, op := range ops {
        txn.sus = append(txn.sus, op.toRequestOp())
    }
}

func (txn *txn) Else(ops ...Op) Txn {
    ...
    for _, op := range ops {
        txn.fas = append(txn.fas, op.toRequestOp())
    }
}

完成分支操作的处理后,会调用 txn.Commit() 发送 gRPC 请求将事务提交到服务端。

func (txn *txn) Commit() (*TxnResponse, error) {
    ...
    r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}

    var resp *pb.TxnResponse
    var err error
    // 调用 KVClient 的 Txn 方法发送 gRPC 请求
    resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
    if err != nil {
        return nil, toErr(txn.ctx, err)
    }
    return (*TxnResponse)(resp), nil
}

Server 处理

EtcdServer 实现了 KVServerTxn 方法,跟 Put 操作 一样,最终会发送一个 raft 请求等待其它节点完成响应并返回结果。

func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
    ...
    ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    // 发送 Raft 请求
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.TxnResponse), nil
}

Raft 节点收到的请求之后,会交由运行 etcd 服务时的调度器执行。

// file: server/etcdserver/server.go
func (s *EtcdServer) run() {
...
for {
select {
case ap :=

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论