学习 etcd 存储的第一步,从 Etcd 框架和 Put 操作源码说起(详细版)

2023年 7月 13日 102.9k 0

etcd 是一个分布式、高可用的键值存储系统,它被设计为可靠的、安全的、快速的,并具有简单的API。

etcd 使用 Go 语言开发,基于 Raft 算法实现了分布式一致性。它可以用于存储集群中的关键配置信息、服务发现、锁等。

etcd 的数据模型类似于一个简单的文件系统,支持 PUT、GET、DELETE 等操作,每个节点的数据会自动同步到其他节点上,因此可以实现高可用、自动故障转移等功能。etcd 还支持 Watch 机制,可以监控数据变化并触发相应的操作。

PS:本文前大半部分都是在讲一些 etcdctl、etcd server、raft 节点间数据流转与处理相关的内容,如不感兴趣,可直接跳转到 迎接请求到来 部分阅读 Put 操作流程。

下面是一个最简单的 etcd put 操作:

# 启动 etcd server
$ etcd

$ etcdctl put gretting "hello"

$ etcdctl get gretting
gretting
hello

当我们发送执行一个 put 操作时,etcdctl 内部会创建一个 etcd 的客户端,向 etcd 服务端发送相应的 grpc 请求来完成操作。

etcdctl

etcd 的源码位于 github.com/etcd-io/etc… 接下来以 etcdctl 为切入点开始源码解读。为了保持简单,我们只关心示例操作的主要流程,比如这篇里我们只关注与 Put 操作相关的流程。

etcdctl

etcdctl 用于向 etcd server 发送请求,下面是 etcdctl 工具的入口方法。

// file: etcdctl/main.go
func main() {
    ctlv3.MustStart()
    return
}

入口函数内调用了 ctlv3.MustStart(),在 ctlv3 包下,初始化了命令行参数及命令处理函数,MustStart() 方法最终会调用到命令行工具的 Execute() 方法执行命令。

// file: etcdctl/ctlv3/ctl.go
...
var (
	rootCmd = &cobra.Command{
		Use:        cliName,
		Short:      cliDescription,
		SuggestFor: []string{"etcdctl"},
	}
)

func init() {
  // 初始化并解析命令行参数
	rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
	rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")
  ...
}

// 注册各类操作方法
rootCmd.AddCommand(
    command.NewGetCommand(),
    command.NewPutCommand(),
    command.NewDelCommand(),
    ...
)

func Start() error {
  ...
  // 执行命令
	return rootCmd.Execute()
}

func MustStart() {
	if err := Start(); err != nil {
    ...
	}
}

可以看到,etcd 默认的 endpoints127.0.0.1:2379,也就是说我们最开始启动的 etcd 服务默认就监听在这个地址。

由于我们是一次 PUT 操作,在 command.NewPutCommand() 中注册了 PUT 命令,命令的处理函数是 putCommandFunc,该函数中真正执行了 PUT 操作。

// file: etcdctl/ctlv3/command/put_command.go
func NewPutCommand() *cobra.Command {
    cmd := &cobra.Command{
    ...
        Run: putCommandFunc,
    }
    ...
    return cmd
}

func putCommandFunc(cmd *cobra.Command, args []string) {
    ...
    resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
    ...
}

mustClientFromCmd() 实现了将命令行操作转换为 etcd 客户端。在 golang 中,我们通常使用 clientv3.Client 来作为 etcd 客户端。

clientv3.New(clientv3.Config{
    Endpoints:   conf.AppConf.Etcd.Hosts,
    DialTimeout: 5 * time.Second,
    ...
})

mustClientFromCmd() 方法中会先调用 clientConfigFromCmd() 根据命令行参数生成必要的配置信息,然后调用 mustClient 创建客户端。

// file: etcdctl/ctlv3/command/global.go
func mustClientFromCmd(cmd *cobra.Command) *clientv3.Client {
    cfg := clientConfigFromCmd(cmd)
    return mustClient(cfg)
}

func clientConfigFromCmd(cmd *cobra.Command) *clientv3.ConfigSpec {
    ...
    // 对应 clientv3 的 Endpoints 和 DialTimeout 属性
    cfg := &clientv3.ConfigSpec{}
    cfg.Endpoints, err = endpointsFromCmd(cmd)
    cfg.DialTimeout = dialTimeoutFromCmd(cmd)
    ...
}

func mustClient(cc *clientv3.ConfigSpec) *clientv3.Client {
    ...
    // 创建客户端
    client, err := clientv3.New(*cfg)
    ...
    return client
}

可以看到,etcdctl 工具实际上就是使用 clientv3.New 在内部创建了一个客户端来进行各种操作。我们先来大概了解一下创建客户端时做了些什么。

clientv3.Client 结构体中,组合了一些接口,这些接口负责执行各种 etcd 的操作,可以允许我们直接使用 Client 的实例来进行如 PutGrantWatch 等操作。

// file: client/v3/client.go
type Client struct {
    KV
    Lease
    Watcher
    ...
}

// file: client/v3/kv.go
type KV interface {
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    ...
}

// file: client/v3/lease.go
type Lease interface {
    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
    ...
}

在调用 clientv3.New() 时,会与 Endpoints 所在 etcd 服务建立连接,接着创建这些接口实现的实例。

// file: client/v3/client.go
func New(cfg Config) (*Client, error) {
    ...
    return newClient(&cfg)
}

func newClient(cfg *Config) (*Client, error) {
    ...
    // 建立连接
    conn, err := client.dialWithBalancer()
    ... 
    // 创建接口实例
    client.KV = NewKV(client)
    client.Lease = NewLease(client)
    ...
}

dialWithBalancer() 方法向 etcd 服务发起了一个 grpc 连接请求,到这里我们知道,etcd 的客户端和服务端使用 grpc 进行通信。注意,客户端选择了 endpoints[0] 的地址发送连接请求,这是因为 Endpoints 列表中的第一个地址通常是 raft 协议的 leader 节点,Raft 协议规定只有 leader 节点才能处理写请求,因此客户端应该优先连接 leader 节点,这里不再细述。

// file: client/v3/client.go
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
    ...
    return c.dial(creds, opts...)
}

func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
    ...
    target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0]))
    conn, err := grpc.DialContext(dctx, target, opts...)
    ...
}

再来看创建接口实例部分,由于我们进行的是最普通的 Put 操作,由 KV 接口定义,这里主要看 NewKV() 方法。方法逻辑非常简单,创建了一个 kv 结构体,该结构体实现了 KV 接口定义的所有方法。

// file: client/v3/kv.go
func NewKV(c *Client) KV {
    api := &kv{remote: RetryKVClient(c)}
    ...
    return api
}

RetryKVClient() 创建了一个 grpc 客户端,由于是 grpc,我们知道,存在 KVClient 就会存在一个对应的 KVServer,这个我们放到 etcd 服务启动的时候再来回顾。

// file: client/v3/retry.go
func RetryKVClient(c *Client) pb.KVClient {
    return &retryKVClient{
        kc: pb.NewKVClient(c.conn),
    }
}

// file: api/etcdserverpb/rpc.pb.go
func NewKVClient(cc *grpc.ClientConn) KVClient {
    return &kVClient{cc}
}

Put

处理完创建完客户端的逻辑之后,紧接着接着调用了 Put() 方法,开始发送 Put 请求。

resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)

从前面我们知道,Client 调用 Put 方法会执行到 kv 结构体内的 Put() 。Put 方法内调用了 kv.Do(),传入了一个 Put Op 类型。kv.Do() 方法逻辑很简单,根据对应的 Op 类型向 etcd 发送一个 grpc 请求。在前面 NewKv(client) 时我们知道,kv.remote 就是 grpc 的客户端 KVClient,所以这里就是直接发送了一个 grpc put 请求。

// file: client/v3/kv.go
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
    r, err := kv.Do(ctx, OpPut(key, val, opts...))
    return r.put, toErr(ctx, err)
}

func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
    var err error
    switch op.t {
    ...
    case tPut:
    var resp *pb.PutResponse
        // 创建请求 Message
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        // 调用 KVClient.Put() 方法
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }
        ...
    }
    return OpResponse{}, toErr(ctx, err)
}

目前为止,我们了解了使用 etcdctl 的一次 put 请求的核心流程,接下来看看 etcd 服务端是如何处理一个 Put 请求的。

put

etcd 服务启动

etcd server 的入口方法如下:

// file: server/main.go
func main() {
    etcdmain.Main(os.Args)
}

由于我们没有给 etcd 传入任何参数,直接进入 startEtcdOrProxyV2() 流程。

// file: server/etcdmain/main.go
func Main(args []string) {
    ...
    startEtcdOrProxyV2(args)
}

先关注最简单的 etcd 命令启动,在 startEtcdOrProxyV2() 中,主要流程就是初始化一些配置、日志,检查部分参数然后启动 etcd,最后阻塞等待 etcd 抛出错误或停止然后退出。

// file: server/etcdmain/etcd.go
func startEtcdOrProxyV2(args []string) {
...
// 配置初始化
cfg := newConfig()
...

// 日志初始化
lg := cfg.ec.GetLogger()
if lg == nil {
...
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
}
...

// 启动
stopped, errc, err = startEtcd(&cfg.ec)
...

// 阻塞进程
select {
case lerr :=

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论