etcd 是一个分布式、高可用的键值存储系统,它被设计为可靠的、安全的、快速的,并具有简单的API。
etcd 使用 Go 语言开发,基于 Raft 算法实现了分布式一致性。它可以用于存储集群中的关键配置信息、服务发现、锁等。
etcd 的数据模型类似于一个简单的文件系统,支持 PUT、GET、DELETE 等操作,每个节点的数据会自动同步到其他节点上,因此可以实现高可用、自动故障转移等功能。etcd 还支持 Watch 机制,可以监控数据变化并触发相应的操作。
PS:本文前大半部分都是在讲一些 etcdctl、etcd server、raft 节点间数据流转与处理相关的内容,如不感兴趣,可直接跳转到 迎接请求到来 部分阅读 Put 操作流程。
下面是一个最简单的 etcd put 操作:
# 启动 etcd server
$ etcd
$ etcdctl put gretting "hello"
$ etcdctl get gretting
gretting
hello
当我们发送执行一个 put 操作时,etcdctl 内部会创建一个 etcd 的客户端,向 etcd 服务端发送相应的 grpc 请求来完成操作。
etcd 的源码位于 github.com/etcd-io/etc… 接下来以 etcdctl 为切入点开始源码解读。为了保持简单,我们只关心示例操作的主要流程,比如这篇里我们只关注与 Put 操作相关的流程。
etcdctl
etcdctl
用于向 etcd server 发送请求,下面是 etcdctl 工具的入口方法。
// file: etcdctl/main.go
func main() {
ctlv3.MustStart()
return
}
入口函数内调用了 ctlv3.MustStart()
,在 ctlv3
包下,初始化了命令行参数及命令处理函数,MustStart()
方法最终会调用到命令行工具的 Execute()
方法执行命令。
// file: etcdctl/ctlv3/ctl.go
...
var (
rootCmd = &cobra.Command{
Use: cliName,
Short: cliDescription,
SuggestFor: []string{"etcdctl"},
}
)
func init() {
// 初始化并解析命令行参数
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")
...
}
// 注册各类操作方法
rootCmd.AddCommand(
command.NewGetCommand(),
command.NewPutCommand(),
command.NewDelCommand(),
...
)
func Start() error {
...
// 执行命令
return rootCmd.Execute()
}
func MustStart() {
if err := Start(); err != nil {
...
}
}
可以看到,etcd 默认的 endpoints
为 127.0.0.1:2379
,也就是说我们最开始启动的 etcd 服务默认就监听在这个地址。
由于我们是一次 PUT 操作,在 command.NewPutCommand()
中注册了 PUT 命令,命令的处理函数是 putCommandFunc
,该函数中真正执行了 PUT 操作。
// file: etcdctl/ctlv3/command/put_command.go
func NewPutCommand() *cobra.Command {
cmd := &cobra.Command{
...
Run: putCommandFunc,
}
...
return cmd
}
func putCommandFunc(cmd *cobra.Command, args []string) {
...
resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
...
}
mustClientFromCmd()
实现了将命令行操作转换为 etcd 客户端。在 golang 中,我们通常使用 clientv3.Client
来作为 etcd 客户端。
clientv3.New(clientv3.Config{
Endpoints: conf.AppConf.Etcd.Hosts,
DialTimeout: 5 * time.Second,
...
})
在 mustClientFromCmd()
方法中会先调用 clientConfigFromCmd()
根据命令行参数生成必要的配置信息,然后调用 mustClient
创建客户端。
// file: etcdctl/ctlv3/command/global.go
func mustClientFromCmd(cmd *cobra.Command) *clientv3.Client {
cfg := clientConfigFromCmd(cmd)
return mustClient(cfg)
}
func clientConfigFromCmd(cmd *cobra.Command) *clientv3.ConfigSpec {
...
// 对应 clientv3 的 Endpoints 和 DialTimeout 属性
cfg := &clientv3.ConfigSpec{}
cfg.Endpoints, err = endpointsFromCmd(cmd)
cfg.DialTimeout = dialTimeoutFromCmd(cmd)
...
}
func mustClient(cc *clientv3.ConfigSpec) *clientv3.Client {
...
// 创建客户端
client, err := clientv3.New(*cfg)
...
return client
}
可以看到,etcdctl
工具实际上就是使用 clientv3.New
在内部创建了一个客户端来进行各种操作。我们先来大概了解一下创建客户端时做了些什么。
在 clientv3.Client
结构体中,组合了一些接口,这些接口负责执行各种 etcd 的操作,可以允许我们直接使用 Client
的实例来进行如 Put
、Grant
、Watch
等操作。
// file: client/v3/client.go
type Client struct {
KV
Lease
Watcher
...
}
// file: client/v3/kv.go
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
...
}
// file: client/v3/lease.go
type Lease interface {
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
...
}
在调用 clientv3.New()
时,会与 Endpoints
所在 etcd 服务建立连接,接着创建这些接口实现的实例。
// file: client/v3/client.go
func New(cfg Config) (*Client, error) {
...
return newClient(&cfg)
}
func newClient(cfg *Config) (*Client, error) {
...
// 建立连接
conn, err := client.dialWithBalancer()
...
// 创建接口实例
client.KV = NewKV(client)
client.Lease = NewLease(client)
...
}
dialWithBalancer()
方法向 etcd 服务发起了一个 grpc 连接请求,到这里我们知道,etcd 的客户端和服务端使用 grpc 进行通信。注意,客户端选择了 endpoints[0]
的地址发送连接请求,这是因为 Endpoints 列表中的第一个地址通常是 raft 协议的 leader 节点,Raft 协议规定只有 leader 节点才能处理写请求,因此客户端应该优先连接 leader 节点,这里不再细述。
// file: client/v3/client.go
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
...
return c.dial(creds, opts...)
}
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
...
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
...
}
再来看创建接口实例部分,由于我们进行的是最普通的 Put 操作,由 KV
接口定义,这里主要看 NewKV()
方法。方法逻辑非常简单,创建了一个 kv
结构体,该结构体实现了 KV
接口定义的所有方法。
// file: client/v3/kv.go
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
...
return api
}
RetryKVClient()
创建了一个 grpc 客户端,由于是 grpc,我们知道,存在 KVClient
就会存在一个对应的 KVServer
,这个我们放到 etcd 服务启动的时候再来回顾。
// file: client/v3/retry.go
func RetryKVClient(c *Client) pb.KVClient {
return &retryKVClient{
kc: pb.NewKVClient(c.conn),
}
}
// file: api/etcdserverpb/rpc.pb.go
func NewKVClient(cc *grpc.ClientConn) KVClient {
return &kVClient{cc}
}
Put
处理完创建完客户端的逻辑之后,紧接着接着调用了 Put()
方法,开始发送 Put 请求。
resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
从前面我们知道,Client
调用 Put 方法会执行到 kv
结构体内的 Put()
。Put 方法内调用了 kv.Do()
,传入了一个 Put Op
类型。kv.Do()
方法逻辑很简单,根据对应的 Op
类型向 etcd 发送一个 grpc 请求。在前面 NewKv(client)
时我们知道,kv.remote
就是 grpc 的客户端 KVClient
,所以这里就是直接发送了一个 grpc put 请求。
// file: client/v3/kv.go
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
...
case tPut:
var resp *pb.PutResponse
// 创建请求 Message
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
// 调用 KVClient.Put() 方法
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
...
}
return OpResponse{}, toErr(ctx, err)
}
目前为止,我们了解了使用 etcdctl 的一次 put 请求的核心流程,接下来看看 etcd 服务端是如何处理一个 Put 请求的。
etcd 服务启动
etcd server 的入口方法如下:
// file: server/main.go
func main() {
etcdmain.Main(os.Args)
}
由于我们没有给 etcd 传入任何参数,直接进入 startEtcdOrProxyV2()
流程。
// file: server/etcdmain/main.go
func Main(args []string) {
...
startEtcdOrProxyV2(args)
}
先关注最简单的 etcd
命令启动,在 startEtcdOrProxyV2()
中,主要流程就是初始化一些配置、日志,检查部分参数然后启动 etcd,最后阻塞等待 etcd 抛出错误或停止然后退出。
// file: server/etcdmain/etcd.go
func startEtcdOrProxyV2(args []string) {
...
// 配置初始化
cfg := newConfig()
...
// 日志初始化
lg := cfg.ec.GetLogger()
if lg == nil {
...
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
}
...
// 启动
stopped, errc, err = startEtcd(&cfg.ec)
...
// 阻塞进程
select {
case lerr :=