项目目录:examples/demo/cluster_grpc
这个 demo 跟前一个其实差不多,功能是一样的,只是使用的 rpc 不同。
cluster 使用的是默认的 rpc 实现,即 NatsRPCServer
,这个使用的是 GRPCServer
我们先把项目跑起来看一下
运行
运行前端、后端服务器
运行前端服务
前端服务器默认指定了 grpc port 为 3434
go run main.go
运行后端服务
默认的 grpc port 已经被使用了,这里需要指定新的 grpc,每个服务器都会开启一个 rpc server
go run main.go -type=room -frontend=false -rpcsvport=3435
当后端服务开启的时候,发现前端服务打印出了这条信息:
level=debug msg="[grpc client] added server 16d4f9c5-bab0-4dc5-829f-ea9af0d27f9a at 127.0.0.1:3435" source=pitaya
通过服务发现,前端服务识别到了这个后端服务,并且可以获取他的 grpc port,这就为之后的 RPCCall 做好了准备
运行 pitaya-cli 客户端
其他的功能跟前面的 demo 一样,这里我们只测试一下远程调用,grpc 跟 nats 的结果是一样的:
> pitaya-cli
Pitaya REPL Client
>>> connect 127.0.0.1:3250
Using json client
connected!
>>> request room.room.entry
>>> sv->{"code":0,"result":"ok"}
>>> request room.room.sendrpc {"server_id":"905a0b01-51db-4d0d-bde0-d4987ff75215", "route":"connector.connectorremote.remotefunc", "msg":"this is a remote call"}
>>> sv->{"Msg":"received msg: thisisaremotecall"}
代码分析
如何创建一个基于 grpc 的 Pitaya App
接下来让我们看一下, 以 grpc 来实现远程调用的服务,是怎样构建的
// main.go
func main() {
...
meta := map[string]string{
constants.GRPCHostKey: "127.0.0.1",
constants.GRPCPortKey: strconv.Itoa(*rpcServerPort),
}
var bs *modules.ETCDBindingStorage
app, bs = createApp(*port, *isFrontend, *svType, meta, *rpcServerPort)
...
app.Start()
}
func createApp(port int, isFrontend bool, svType string, meta map[string]string, rpcServerPort int) (pitaya.Pitaya, *modules.ETCDBindingStorage) {
builder := pitaya.NewDefaultBuilder(isFrontend, svType, pitaya.Cluster, meta, *config.NewDefaultBuilderConfig())
grpcServerConfig := config.NewDefaultGRPCServerConfig()
grpcServerConfig.Port = rpcServerPort
gs, err := cluster.NewGRPCServer(*grpcServerConfig, builder.Server, builder.MetricsReporters)
if err != nil {
panic(err)
}
builder.RPCServer = gs
builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig())
bs := modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewDefaultETCDBindingConfig())
gc, err := cluster.NewGRPCClient(
*config.NewDefaultGRPCClientConfig(),
builder.Server,
builder.MetricsReporters,
bs,
cluster.NewInfoRetriever(*config.NewDefaultInfoRetrieverConfig()),
)
if err != nil {
panic(err)
}
builder.RPCClient = gc
if isFrontend {
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port))
builder.AddAcceptor(tcp)
}
return builder.Build(), bs
}
这部分代码明显比之前那个 demo 要复杂的多,我们只关注与 grpc 有关的逻辑:
NewGRPCServer
,并指定给 BuilderNewGRPCClient
,并指定给 Builder,且需要设置 ETCDBindingStorage
,这个 Storage 是一个 Module
,后面需要注册到 app首先需要知道,RPCServer 和 RPCClient 是成对出现的,如果你开启了一个 GRPCServer,那么对应使用的应该是 GRPCClient,所以上面行39我们指定了对应的 GRPCClient。
GRPCServer 是对 RPCServer
interface 的其中一个实现,另一个实现是默认的 NatsRPCServer
,Client 端也是一样。
默认的使用的 Nats 和需要手动创建一堆东西的 grpc 有什么不同呢,我们依次简单看看他们的实现
grpc
GRPCServer
grpcServerConfig := config.NewDefaultGRPCServerConfig()
grpcServerConfig.Port = rpcServerPort
gs, err := cluster.NewGRPCServer(*grpcServerConfig, builder.Server, builder.MetricsReporters)
if err != nil {
panic(err)
}
builder.RPCServer = gs
GRPCServerConfig
指定了 GRPCServer 要监听的端口,默认是 3434
type GRPCServerConfig struct {
Port int `mapstructure:"port"`
}
GRPCServer
在创建的时候并没有开启监听,只是做了一些变量初始化,真正开启服务是在他的 Init
函数里,这个函数是最后 app.Start
时候调用的。实际上 RPCServer 会 Start 时先被包装成一个 ModuleWrapper,然后 startModules
统一调用了 Module 的初始化,这部分细节之后在看。这里先关注一下 GRPCServer
的定义:
type GRPCServer struct {
server *Server // 持有外层(顶层)的Server
port int // 监听端口
metricsReporters []metrics.Reporter // 监控指标
grpcSv *grpc.Server // 实际开启的 grpc 服务
pitayaServer protos.PitayaServer // 即 RemoteService,用来处理服务器间通信
}
我们主要看其中的两个变量:
grpc.Server
是实际提供服务的 Server,来自于 google 的 grpc 包,GRPCServer 其实就是对他的封装protos.PitayaServer
指向了一个RemoteService
,这个服务是 App 唯一的,用来处理与其他服务器间的通信
再看一下 GRPC 的真正启动,是在 Init
里开启了一个 goroutine:
func (gs *GRPCServer) Init() error {
port := gs.port
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
gs.grpcSv = grpc.NewServer()
protos.RegisterPitayaServer(gs.grpcSv, gs.pitayaServer)
go gs.grpcSv.Serve(lis)
return nil
}
行8调用的函数 RegisterPitayaServer
是在 pitaya.pb.go 里定义的(源文件 pitaya.proto 在 pitaya-protos 目录下,是 git 仓库的 submodule,如果是以压缩包的形式下载,或者拉取仓库后没有同步 submodule,则没有这个目录数据,可以手动下载:topfreegames/pitaya-protos: this repository contains the proto files for pitaya projects (github.com))。最终调用的是 grpc 的 RegisterService
:
// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
// ensure it implements sd.HandlerType.
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{})
将服务及其实现注册到grpc服务器,且必须在 Serve
方法前调用,这是 grpc 的用法,有机会再研究吧,这里不再深入。
GRPCClient
Client 和 Server 一样,仍然需要自己创建,再指定给 builder:
bs := modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewDefaultETCDBindingConfig())
gc, err := cluster.NewGRPCClient(
*config.NewDefaultGRPCClientConfig(),
builder.Server,
builder.MetricsReporters,
bs,
cluster.NewInfoRetriever(*config.NewDefaultInfoRetrieverConfig()),
)
if err != nil {
panic(err)
}
builder.RPCClient = gc
RPCClient
也是一个 Module,包装后由 startModules 统一调用初始化,GRPCClient
Init 没有什么逻辑,主要是 NatsRPCClient
那边需要初始化,之后我们讲到了再看。
还是看一下结构体定义:
type GRPCClient struct {
bindingStorage interfaces.BindingStorage // 记录了用户绑定在哪个前端服务器中
clientMap sync.Map // 连接到不同 GRPCServer 的 grpcClient 字典,key 为 serverID
dialTimeout time.Duration // 建立连接的超时时间
infoRetriever InfoRetriever // 信息过滤,用来做 server 分区
lazy bool // 是否使用懒连接,即仅在向服务器发出请求时才连接服务器
metricsReporters []metrics.Reporter // 监控指标
reqTimeout time.Duration // 进行 RPC 调用的请求超时时间
server *Server // 持有外层(顶层)的Server
}
clientMap
一个字典,记录了所有的 grpcClient
,不像 GRPCServer
是对一个 grpcServer 的封装,GRPCClient
是对一组 grpcClient 的管理。
当一个新的 Server 被发现时(一个 Server 连接到了 etcd),有如下调用链:etcdServiceDiscovery.notifyListeners
-> SDListener.AddServer
, GRPCClient 就实现了 SDListener 接口。所以,一个新的 Server 加入后,有如下逻辑:
func (gs *GRPCClient) AddServer(sv *Server) {
...
address := fmt.Sprintf("%s:%s", host, port)
client := &grpcClient{address: address}
if !gs.lazy {
if err := client.connect(); err != nil {
logger.Log.Errorf("[grpc client] unable to connect to server %s at %s: %v", sv.ID, address, err)
}
}
gs.clientMap.Store(sv.ID, client)
...
}
即,创建一个新的 grpcClient 存储在线程安全的 clientMap 中,lazy
为 true 时则是懒连接,只有在向 Server 发出请求时才会连接。
BindingStorage
该字段来自 main 中创建的 ETCDBindingStorage
,用来记录用户绑定在哪个前端服务器中,提供了接口 GetUserFrontendID
,可以通过 uid 获取用户连接到的前端服务器 ID。
ETCDBindingStorage 也是一个 Module,在 main 中被显式注册到 app 里:
app.RegisterModule(bs, "bindingsStorage")
遵循 Module 的规则,该模块实现了 Init 接口:
func (b *ETCDBindingStorage) Init() error {
var cli *clientv3.Client
var err error
if b.cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: b.etcdEndpoints,
DialTimeout: b.etcdDialTimeout,
})
if err != nil {
return err
}
b.cli = cli
}
// namespaced etcd :)
b.cli.KV = namespace.NewKV(b.cli.KV, b.etcdPrefix)
err = b.bootstrapLease()
if err != nil {
return err
}
if b.thisServer.Frontend {
b.setupOnSessionCloseCB()
b.setupOnAfterSessionBindCB()
}
return nil
}
注意看一下行21,当这个服务是前端服务器的时候,设置了两个 Session Callback,用户连接上来时,将 uid-serverType:serverId 键值对存储到 etcd,当用户断开连接时,移除数据。所以接口 GetUserFrontendID
可以从 etcd 获取玩家连接到的前端服务器ID,使得 grpcClient 可以推送数据到对应的玩家。
NatsRPC
如果不显式指定,框架默认使用的是基于 Nats 实现的 rpc,这个 demo 里并没有使用到,既然我们已经看了 grpc,也顺便对比一下 NatsRPC 的实现,看看有何不同。
既然是基于 Nats 的 rpc 实现,那我们先简单了解一下什么是 Nats
Nats
Nats 是 go 开发的开源、高效的消息中间件,支持 pub/sub 消息订阅、request/reply 消息回复等,其中对 replay 的支持就有了 rpc 的感觉。
在第一篇的准备工作里,我们已经开启了 nats 服务,接下来看看 Pitaya 是如何使用 Nats 来实现 rpc 服务的。
NatsRPCServer
这个结构体定义相比较于前面的就很复杂了:
type NatsRPCServer struct {
service int // 用来处理消息的 goroutine 数量
connString string // nats 服务器地址
connectionTimeout time.Duration // 与 nats 服务器建立连接的超时时间
maxReconnectionRetries int // 与 nats 服务器重连的最大次数
server *Server // 持有外层(顶层)的Server
conn *nats.Conn // 与 nats 服务器建立的连接,即客户端
pushBufferSize int // 推送消息的缓冲区大小
messagesBufferSize int // 接受消息的缓冲区大小
stopChan chan bool // 用于实现优雅关闭
subChan chan *nats.Msg // 接收消息的通道
bindingsChan chan *nats.Msg // 接收通知:用户绑定到其他服务器时(绑定用户到session)
unhandledReqCh chan *protos.Request // 从 subChan 接收到的消息,处理解析成 protos.Request 放入这个管道,等待处理
responses []*protos.Response // 对 unhandledReqCh 遍历得到请求消息进行处理,结果作为回复消息放在这个切片
// 切片长度为 service,即有多少个 goroutine 处理消息,切片就有多长,每个元素对应一个 goroutine
requests []*protos.Request // 从 unhandledReqCh 遍历得到的请求,切片长度为 service,与 goroutine 的对应关系和 responses 一致
userPushCh chan *protos.Push // 要推送给玩家的消息写入这里
userKickCh chan *protos.KickMsg // 要踢玩家下线的消息写入这里
sub *nats.Subscription // 对订阅topic的一个描述,可以获得该订阅的相关信息
dropped int // 该订阅已经丢弃的消息数
pitayaServer protos.PitayaServer // 传入的 RemoteService,用来处理服务器间通信
metricsReporters []metrics.Reporter // 监控指标
sessionPool session.SessionPool // 用户会话池,用于注册 OnSessionBind,当有新用户绑定 session 时,会调用到 NatsRPCServer.onSessionBind,
// onSessionBind 方法用于实现推送消息到用户,包括 Push 和 Kick,之前的 userPushCh 和 userKickCh 也是这个用处
appDieChan chan bool // 实现优雅关闭
}
这里只对每个变量做了简单注释,具体细节写得很巧妙,之后单独开一篇来学习(有空的话)。
简单概括一下:
服务器在 Nats 上 Subscribe 了一个 topic,格式为:pitaya/serer/{serverType}/{serverID}
,服务器只关心自己这个 ServerID 的主题。当这个主题有消息时会自动写入 subChan
,有一个 goroutine 在遍历该管道,并解析成 protos.Reqeust 格式写到 unhandledReqCh
。待处理的消息管道有多个消费者,初始化时就给他分配了 service
个 goroutine 来读取管道,读取到的请求写入 requests[threadID]
中,该切片的每个元素对应了一个 goroutine,避免了锁竞争。取出的请求交给逻辑层处理(pitayaServer.Call
),回复数据存放在 responses[threadID]
,编码后 Publish 到 Nats,由对端处理。自此,就完成了 订阅 - 处理 - 发布 的流程,也就通过 pub/sub 实现了 RPC。
NatsRPCClient
RPC 的实现是成对出现的,有 Nats 实现的 Server 端,肯定还需要再对应一个 Client 端。
type NatsRPCClient struct {
conn *nats.Conn // 与 nats 服务器建立的连接,即客户端
connString string // nats 服务器地址
connectionTimeout time.Duration // 与 nats 服务器建立连接的超时时间
maxReconnectionRetries int // 与 nats 服务器重连的最大次数
reqTimeout time.Duration // 请求超时时间
running bool // 是否在运行中
server *Server // 持有外层(顶层)的Server
metricsReporters []metrics.Reporter // 监控指标
appDieChan chan bool // 实现优雅关闭
}
与 GRPCClient 不同,grpc 的实现要求客户端必须连接上所有的 GRPCServer,才能互相通信,所以 GRPCClient 是管理了一组面向 GRPCServer 的连接。
而基于 Nats 实现的 RPC,只需要保持一个连接,无论是 Client 还是 Server 都是连接上 Nats 服务器,通过对某一主题的订阅来实现通信。
关于通信,NatsPRCClient 提供了以下接口:
BroadcastSessionBind(uid string) error
Send(topic string, data []byte) error
SendPush(userID string, frontendSv *Server, push *protos.Push) error
SendKick(userID string, serverType string, kick *protos.KickMsg) error
Call(ctx context.Context, rpcType protos.RPCType, route *route.Route, session session.Session, msg *message.Message, server *Server) (*protos.Response, error)
前四个都只是推送消息,无需回复,只是简单的将数据 Publish 到某一主题,其他端订阅了这个主题就会处理这些消息。
最后一个 Call
才是真正的 RPC:
func (ns *NatsRPCClient) Call(
ctx context.Context,
rpcType protos.RPCType,
route *route.Route,
session session.Session,
msg *message.Message,
server *Server,
) (*protos.Response, error) {
...
marshalledData, err := proto.Marshal(&req)
...
m, err = ns.conn.Request(getChannel(server.Type, server.ID), marshalledData, ns.reqTimeout)
...
res := &protos.Response{}
err = proto.Unmarshal(m.Data, res)
...
return res, nil
}
Nats 虽然定位是一个消息中间件,但是也提供了 request/reply 模式,这里就是基于 Nats 提供的 Request
API 实现了远程调用。
异同
值得一提的是 GRPC 并没有实现 Send
方法:
Send
方法的声明如下:
Send(route string, data []byte) error
这个方法是发送消息到某一路由,NatsRPC 都连接到了 Nats 服务器上,可以通过订阅主题来实现。而 GRPC 是服务之间互相连接的,没有一个中心点来处理这种形式的广播。
如果要发送消息到某一个玩家,可以调用 SendPush
。再稍微看下他的实现:
// GRPCClient
func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error {
if frontendSv.ID != "" {
svID = frontendSv.ID
} else {
if gs.bindingStorage == nil {
return constants.ErrNoBindingStorageModule
}
svID, err = gs.bindingStorage.GetUserFrontendID(userID, frontendSv.Type)
if err != nil {
return err
}
}
if c, ok := gs.clientMap.Load(svID); ok {
ctxT, done := context.WithTimeout(context.Background(), gs.reqTimeout)
defer done()
err := c.(*grpcClient).pushToUser(ctxT, push)
return err
}
return constants.ErrNoConnectionToServer
}
// NatsRPCClient
func (ns *NatsRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error {
topic := GetUserMessagesTopic(userID, frontendSv.Type)
msg, err := proto.Marshal(push)
if err != nil {
return err
}
return ns.Send(topic, msg)
}
代码比较简单啊,不一步步看了,直接说结论:
不需要知道玩家具体在哪个前端服务器上,只需要提供 userID 和服务器类型,就可以自动定位到。grpc 是通过 BindgStorage
来实现的,Nats 更简单,反正直接推这个主题上去就行了,谁关心,谁消费。