[Pitaya Demo解读笔记]3.cluster_grpc demo
项目目录:examples/demo/cluster_grpc
这个 demo 跟前一个其实差不多,功能是一样的,只是使用的 rpc 不同。
cluster 使用的是默认的 rpc 实现,即 NatsRPCServer
,这个使用的是 GRPCServer
我们先把项目跑起来看一下
运行
运行前端、后端服务器
运行前端服务
前端服务器默认指定了 grpc port 为 3434
go run main.go
运行后端服务
默认的 grpc port 已经被使用了,这里需要指定新的 grpc,每个服务器都会开启一个 rpc server
go run main.go -type=room -frontend=false -rpcsvport=3435
当后端服务开启的时候,发现前端服务打印出了这条信息:
level=debug msg="[grpc client] added server 16d4f9c5-bab0-4dc5-829f-ea9af0d27f9a at 127.0.0.1:3435" source=pitaya
通过服务发现,前端服务识别到了这个后端服务,并且可以获取他的 grpc port,这就为之后的 RPCCall 做好了准备
运行 pitaya-cli 客户端
其他的功能跟前面的 demo 一样,这里我们只测试一下远程调用,grpc 跟 nats 的结果是一样的:
> pitaya-cli Pitaya REPL Client >>> connect 127.0.0.1:3250 Using json client connected! >>> request room.room.entry >>> sv->{"code":0,"result":"ok"} >>> request room.room.sendrpc {"server_id":"905a0b01-51db-4d0d-bde0-d4987ff75215", "route":"connector.connectorremote.remotefunc", "msg":"this is a remote call"} >>> sv->{"Msg":"received msg: thisisaremotecall"}
代码分析
如何创建一个基于 grpc 的 Pitaya App
接下来让我们看一下, 以 grpc 来实现远程调用的服务,是怎样构建的
// main.go func main() { ... meta := map[string]string{ constants.GRPCHostKey: "127.0.0.1", constants.GRPCPortKey: strconv.Itoa(*rpcServerPort), } var bs *modules.ETCDBindingStorage app, bs = createApp(*port, *isFrontend, *svType, meta, *rpcServerPort) ... app.Start() } func createApp(port int, isFrontend bool, svType string, meta map[string]string, rpcServerPort int) (pitaya.Pitaya, *modules.ETCDBindingStorage) { builder := pitaya.NewDefaultBuilder(isFrontend, svType, pitaya.Cluster, meta, *config.NewDefaultBuilderConfig()) grpcServerConfig := config.NewDefaultGRPCServerConfig() grpcServerConfig.Port = rpcServerPort gs, err := cluster.NewGRPCServer(*grpcServerConfig, builder.Server, builder.MetricsReporters) if err != nil { panic(err) } builder.RPCServer = gs builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig()) bs := modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewDefaultETCDBindingConfig()) gc, err := cluster.NewGRPCClient( *config.NewDefaultGRPCClientConfig(), builder.Server, builder.MetricsReporters, bs, cluster.NewInfoRetriever(*config.NewDefaultInfoRetrieverConfig()), ) if err != nil { panic(err) } builder.RPCClient = gc if isFrontend { tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port)) builder.AddAcceptor(tcp) } return builder.Build(), bs }
这部分代码明显比之前那个 demo 要复杂的多,我们只关注与 grpc 有关的逻辑:
NewGRPCServer
,并指定给 BuilderNewGRPCClient
,并指定给 Builder,且需要设置 ETCDBindingStorage
,这个 Storage 是一个 Module
,后面需要注册到 app首先需要知道,RPCServer 和 RPCClient 是成对出现的,如果你开启了一个 GRPCServer,那么对应使用的应该是 GRPCClient,所以上面行39我们指定了对应的 GRPCClient。
GRPCServer 是对 RPCServer
interface 的其中一个实现,另一个实现是默认的 NatsRPCServer
,Client 端也是一样。
默认的使用的 Nats 和需要手动创建一堆东西的 grpc 有什么不同呢,我们依次简单看看他们的实现
grpc
GRPCServer
grpcServerConfig := config.NewDefaultGRPCServerConfig() grpcServerConfig.Port = rpcServerPort gs, err := cluster.NewGRPCServer(*grpcServerConfig, builder.Server, builder.MetricsReporters) if err != nil { panic(err) } builder.RPCServer = gs
GRPCServerConfig
指定了 GRPCServer 要监听的端口,默认是 3434
type GRPCServerConfig struct { Port int `mapstructure:"port"` }
GRPCServer
在创建的时候并没有开启监听,只是做了一些变量初始化,真正开启服务是在他的 Init
函数里,这个函数是最后 app.Start
时候调用的。实际上 RPCServer 会 Start 时先被包装成一个 ModuleWrapper,然后 startModules
统一调用了 Module 的初始化,这部分细节之后在看。这里先关注一下 GRPCServer
的定义:
type GRPCServer struct { server *Server // 持有外层(顶层)的Server port int // 监听端口 metricsReporters []metrics.Reporter // 监控指标 grpcSv *grpc.Server // 实际开启的 grpc 服务 pitayaServer protos.PitayaServer // 即 RemoteService,用来处理服务器间通信 }
我们主要看其中的两个变量:
grpc.Server
是实际提供服务的 Server,来自于 google 的 grpc 包,GRPCServer 其实就是对他的封装protos.PitayaServer
指向了一个RemoteService
,这个服务是 App 唯一的,用来处理与其他服务器间的通信
再看一下 GRPC 的真正启动,是在 Init
里开启了一个 goroutine:
func (gs *GRPCServer) Init() error { port := gs.port lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return err } gs.grpcSv = grpc.NewServer() protos.RegisterPitayaServer(gs.grpcSv, gs.pitayaServer) go gs.grpcSv.Serve(lis) return nil }
行8调用的函数 RegisterPitayaServer
是在 pitaya.pb.go 里定义的(源文件 pitaya.proto 在 pitaya-protos 目录下,是 git 仓库的 submodule,如果是以压缩包的形式下载,或者拉取仓库后没有同步 submodule,则没有这个目录数据,可以手动下载:topfreegames/pitaya-protos: this repository contains the proto files for pitaya projects (github.com))。最终调用的是 grpc 的 RegisterService
:
// RegisterService registers a service and its implementation to the gRPC // server. It is called from the IDL generated code. This must be called before // invoking Serve. If ss is non-nil (for legacy code), its type is checked to // ensure it implements sd.HandlerType. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{})
将服务及其实现注册到grpc服务器,且必须在 Serve
方法前调用,这是 grpc 的用法,有机会再研究吧,这里不再深入。
GRPCClient
Client 和 Server 一样,仍然需要自己创建,再指定给 builder:
bs := modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewDefaultETCDBindingConfig()) gc, err := cluster.NewGRPCClient( *config.NewDefaultGRPCClientConfig(), builder.Server, builder.MetricsReporters, bs, cluster.NewInfoRetriever(*config.NewDefaultInfoRetrieverConfig()), ) if err != nil { panic(err) } builder.RPCClient = gc
RPCClient
也是一个 Module,包装后由 startModules 统一调用初始化,GRPCClient
Init 没有什么逻辑,主要是 NatsRPCClient
那边需要初始化,之后我们讲到了再看。
还是看一下结构体定义:
type GRPCClient struct { bindingStorage interfaces.BindingStorage // 记录了用户绑定在哪个前端服务器中 clientMap sync.Map // 连接到不同 GRPCServer 的 grpcClient 字典,key 为 serverID dialTimeout time.Duration // 建立连接的超时时间 infoRetriever InfoRetriever // 信息过滤,用来做 server 分区 lazy bool // 是否使用懒连接,即仅在向服务器发出请求时才连接服务器 metricsReporters []metrics.Reporter // 监控指标 reqTimeout time.Duration // 进行 RPC 调用的请求超时时间 server *Server // 持有外层(顶层)的Server }
clientMap
一个字典,记录了所有的 grpcClient
,不像 GRPCServer
是对一个 grpcServer 的封装,GRPCClient
是对一组 grpcClient 的管理。
当一个新的 Server 被发现时(一个 Server 连接到了 etcd),有如下调用链:etcdServiceDiscovery.notifyListeners
-> SDListener.AddServer
, GRPCClient 就实现了 SDListener 接口。所以,一个新的 Server 加入后,有如下逻辑:
func (gs *GRPCClient) AddServer(sv *Server) { ... address := fmt.Sprintf("%s:%s", host, port) client := &grpcClient{address: address} if !gs.lazy { if err := client.connect(); err != nil { logger.Log.Errorf("[grpc client] unable to connect to server %s at %s: %v", sv.ID, address, err) } } gs.clientMap.Store(sv.ID, client) ... }
即,创建一个新的 grpcClient 存储在线程安全的 clientMap 中,lazy
为 true 时则是懒连接,只有在向 Server 发出请求时才会连接。
BindingStorage
该字段来自 main 中创建的 ETCDBindingStorage
,用来记录用户绑定在哪个前端服务器中,提供了接口 GetUserFrontendID
,可以通过 uid 获取用户连接到的前端服务器 ID。
ETCDBindingStorage 也是一个 Module,在 main 中被显式注册到 app 里:
app.RegisterModule(bs, "bindingsStorage")
遵循 Module 的规则,该模块实现了 Init 接口:
func (b *ETCDBindingStorage) Init() error { var cli *clientv3.Client var err error if b.cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: b.etcdEndpoints, DialTimeout: b.etcdDialTimeout, }) if err != nil { return err } b.cli = cli } // namespaced etcd :) b.cli.KV = namespace.NewKV(b.cli.KV, b.etcdPrefix) err = b.bootstrapLease() if err != nil { return err } if b.thisServer.Frontend { b.setupOnSessionCloseCB() b.setupOnAfterSessionBindCB() } return nil }
注意看一下行21,当这个服务是前端服务器的时候,设置了两个 Session Callback,用户连接上来时,将 uid-serverType:serverId 键值对存储到 etcd,当用户断开连接时,移除数据。所以接口 GetUserFrontendID
可以从 etcd 获取玩家连接到的前端服务器ID,使得 grpcClient 可以推送数据到对应的玩家。
NatsRPC
如果不显式指定,框架默认使用的是基于 Nats 实现的 rpc,这个 demo 里并没有使用到,既然我们已经看了 grpc,也顺便对比一下 NatsRPC 的实现,看看有何不同。
既然是基于 Nats 的 rpc 实现,那我们先简单了解一下什么是 Nats
Nats
Nats 是 go 开发的开源、高效的消息中间件,支持 pub/sub 消息订阅、request/reply 消息回复等,其中对 replay 的支持就有了 rpc 的感觉。
在第一篇的准备工作里,我们已经开启了 nats 服务,接下来看看 Pitaya 是如何使用 Nats 来实现 rpc 服务的。
NatsRPCServer
这个结构体定义相比较于前面的就很复杂了:
type NatsRPCServer struct { service int // 用来处理消息的 goroutine 数量 connString string // nats 服务器地址 connectionTimeout time.Duration // 与 nats 服务器建立连接的超时时间 maxReconnectionRetries int // 与 nats 服务器重连的最大次数 server *Server // 持有外层(顶层)的Server conn *nats.Conn // 与 nats 服务器建立的连接,即客户端 pushBufferSize int // 推送消息的缓冲区大小 messagesBufferSize int // 接受消息的缓冲区大小 stopChan chan bool // 用于实现优雅关闭 subChan chan *nats.Msg // 接收消息的通道 bindingsChan chan *nats.Msg // 接收通知:用户绑定到其他服务器时(绑定用户到session) unhandledReqCh chan *protos.Request // 从 subChan 接收到的消息,处理解析成 protos.Request 放入这个管道,等待处理 responses []*protos.Response // 对 unhandledReqCh 遍历得到请求消息进行处理,结果作为回复消息放在这个切片 // 切片长度为 service,即有多少个 goroutine 处理消息,切片就有多长,每个元素对应一个 goroutine requests []*protos.Request // 从 unhandledReqCh 遍历得到的请求,切片长度为 service,与 goroutine 的对应关系和 responses 一致 userPushCh chan *protos.Push // 要推送给玩家的消息写入这里 userKickCh chan *protos.KickMsg // 要踢玩家下线的消息写入这里 sub *nats.Subscription // 对订阅topic的一个描述,可以获得该订阅的相关信息 dropped int // 该订阅已经丢弃的消息数 pitayaServer protos.PitayaServer // 传入的 RemoteService,用来处理服务器间通信 metricsReporters []metrics.Reporter // 监控指标 sessionPool session.SessionPool // 用户会话池,用于注册 OnSessionBind,当有新用户绑定 session 时,会调用到 NatsRPCServer.onSessionBind, // onSessionBind 方法用于实现推送消息到用户,包括 Push 和 Kick,之前的 userPushCh 和 userKickCh 也是这个用处 appDieChan chan bool // 实现优雅关闭 }
这里只对每个变量做了简单注释,具体细节写得很巧妙,之后单独开一篇来学习(有空的话)。
简单概括一下:
服务器在 Nats 上 Subscribe 了一个 topic,格式为:pitaya/serer/{serverType}/{serverID}
,服务器只关心自己这个 ServerID 的主题。当这个主题有消息时会自动写入 subChan
,有一个 goroutine 在遍历该管道,并解析成 protos.Reqeust 格式写到 unhandledReqCh
。待处理的消息管道有多个消费者,初始化时就给他分配了 service
个 goroutine 来读取管道,读取到的请求写入 requests[threadID]
中,该切片的每个元素对应了一个 goroutine,避免了锁竞争。取出的请求交给逻辑层处理(pitayaServer.Call
),回复数据存放在 responses[threadID]
,编码后 Publish 到 Nats,由对端处理。自此,就完成了 订阅 - 处理 - 发布 的流程,也就通过 pub/sub 实现了 RPC。
NatsRPCClient
RPC 的实现是成对出现的,有 Nats 实现的 Server 端,肯定还需要再对应一个 Client 端。
type NatsRPCClient struct { conn *nats.Conn // 与 nats 服务器建立的连接,即客户端 connString string // nats 服务器地址 connectionTimeout time.Duration // 与 nats 服务器建立连接的超时时间 maxReconnectionRetries int // 与 nats 服务器重连的最大次数 reqTimeout time.Duration // 请求超时时间 running bool // 是否在运行中 server *Server // 持有外层(顶层)的Server metricsReporters []metrics.Reporter // 监控指标 appDieChan chan bool // 实现优雅关闭 }
与 GRPCClient 不同,grpc 的实现要求客户端必须连接上所有的 GRPCServer,才能互相通信,所以 GRPCClient 是管理了一组面向 GRPCServer 的连接。
而基于 Nats 实现的 RPC,只需要保持一个连接,无论是 Client 还是 Server 都是连接上 Nats 服务器,通过对某一主题的订阅来实现通信。
关于通信,NatsPRCClient 提供了以下接口:
BroadcastSessionBind(uid string) error Send(topic string, data []byte) error SendPush(userID string, frontendSv *Server, push *protos.Push) error SendKick(userID string, serverType string, kick *protos.KickMsg) error Call(ctx context.Context, rpcType protos.RPCType, route *route.Route, session session.Session, msg *message.Message, server *Server) (*protos.Response, error)
前四个都只是推送消息,无需回复,只是简单的将数据 Publish 到某一主题,其他端订阅了这个主题就会处理这些消息。
最后一个 Call
才是真正的 RPC:
func (ns *NatsRPCClient) Call( ctx context.Context, rpcType protos.RPCType, route *route.Route, session session.Session, msg *message.Message, server *Server, ) (*protos.Response, error) { ... marshalledData, err := proto.Marshal(&req) ... m, err = ns.conn.Request(getChannel(server.Type, server.ID), marshalledData, ns.reqTimeout) ... res := &protos.Response{} err = proto.Unmarshal(m.Data, res) ... return res, nil }
Nats 虽然定位是一个消息中间件,但是也提供了 request/reply 模式,这里就是基于 Nats 提供的 Request
API 实现了远程调用。
异同
值得一提的是 GRPC 并没有实现 Send
方法:
Send
方法的声明如下:
Send(route string, data []byte) error
这个方法是发送消息到某一路由,NatsRPC 都连接到了 Nats 服务器上,可以通过订阅主题来实现。而 GRPC 是服务之间互相连接的,没有一个中心点来处理这种形式的广播。
如果要发送消息到某一个玩家,可以调用 SendPush
。再稍微看下他的实现:
// GRPCClient func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error { if frontendSv.ID != "" { svID = frontendSv.ID } else { if gs.bindingStorage == nil { return constants.ErrNoBindingStorageModule } svID, err = gs.bindingStorage.GetUserFrontendID(userID, frontendSv.Type) if err != nil { return err } } if c, ok := gs.clientMap.Load(svID); ok { ctxT, done := context.WithTimeout(context.Background(), gs.reqTimeout) defer done() err := c.(*grpcClient).pushToUser(ctxT, push) return err } return constants.ErrNoConnectionToServer } // NatsRPCClient func (ns *NatsRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error { topic := GetUserMessagesTopic(userID, frontendSv.Type) msg, err := proto.Marshal(push) if err != nil { return err } return ns.Send(topic, msg) }
代码比较简单啊,不一步步看了,直接说结论:
不需要知道玩家具体在哪个前端服务器上,只需要提供 userID 和服务器类型,就可以自动定位到。grpc 是通过 BindgStorage
来实现的,Nats 更简单,反正直接推这个主题上去就行了,谁关心,谁消费。