[Pitaya Demo解读笔记]3.cluster_grpc demo

2023年 10月 13日 22.3k 0

项目目录:examples/demo/cluster_grpc

这个 demo 跟前一个其实差不多,功能是一样的,只是使用的 rpc 不同。

cluster 使用的是默认的 rpc 实现,即 NatsRPCServer,这个使用的是 GRPCServer

我们先把项目跑起来看一下

运行

运行前端、后端服务器

  • 运行前端服务

    前端服务器默认指定了 grpc port 为 3434

    go run main.go
    
  • 运行后端服务

    默认的 grpc port 已经被使用了,这里需要指定新的 grpc,每个服务器都会开启一个 rpc server

    go run main.go -type=room -frontend=false -rpcsvport=3435
    
  • 当后端服务开启的时候,发现前端服务打印出了这条信息:

    level=debug msg="[grpc client] added server 16d4f9c5-bab0-4dc5-829f-ea9af0d27f9a at 127.0.0.1:3435" source=pitaya
    

    通过服务发现,前端服务识别到了这个后端服务,并且可以获取他的 grpc port,这就为之后的 RPCCall 做好了准备

    运行 pitaya-cli 客户端

    其他的功能跟前面的 demo 一样,这里我们只测试一下远程调用,grpc 跟 nats 的结果是一样的:

    > pitaya-cli
    Pitaya REPL Client
    >>> connect 127.0.0.1:3250
    Using json client
    connected!
    >>> request room.room.entry
    >>> sv->{"code":0,"result":"ok"}
    >>> request room.room.sendrpc {"server_id":"905a0b01-51db-4d0d-bde0-d4987ff75215", "route":"connector.connectorremote.remotefunc", "msg":"this is a remote call"}
    >>> sv->{"Msg":"received msg: thisisaremotecall"}
    

    代码分析

    如何创建一个基于 grpc 的 Pitaya App

    接下来让我们看一下, 以 grpc 来实现远程调用的服务,是怎样构建的

    // main.go
    func main() {
        ...
        meta := map[string]string{
    		constants.GRPCHostKey: "127.0.0.1",
    		constants.GRPCPortKey: strconv.Itoa(*rpcServerPort),
    	}
    
    	var bs *modules.ETCDBindingStorage
    	app, bs = createApp(*port, *isFrontend, *svType, meta, *rpcServerPort)
        ...
        app.Start()
    }
    
    func createApp(port int, isFrontend bool, svType string, meta map[string]string, rpcServerPort int) (pitaya.Pitaya, *modules.ETCDBindingStorage) {
    	builder := pitaya.NewDefaultBuilder(isFrontend, svType, pitaya.Cluster, meta, *config.NewDefaultBuilderConfig())
    
    	grpcServerConfig := config.NewDefaultGRPCServerConfig()
    	grpcServerConfig.Port = rpcServerPort
    	gs, err := cluster.NewGRPCServer(*grpcServerConfig, builder.Server, builder.MetricsReporters)
    	if err != nil {
    		panic(err)
    	}
    	builder.RPCServer = gs
    	builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig())
    
    	bs := modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewDefaultETCDBindingConfig())
    
    	gc, err := cluster.NewGRPCClient(
    		*config.NewDefaultGRPCClientConfig(),
    		builder.Server,
    		builder.MetricsReporters,
    		bs,
    		cluster.NewInfoRetriever(*config.NewDefaultInfoRetrieverConfig()),
    	)
    	if err != nil {
    		panic(err)
    	}
    	builder.RPCClient = gc
    
    	if isFrontend {
    		tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port))
    		builder.AddAcceptor(tcp)
    	}
    
    	return builder.Build(), bs
    }
    

    这部分代码明显比之前那个 demo 要复杂的多,我们只关注与 grpc 有关的逻辑:

  • 设定 grpc 监听端口
  • 创建 GRPCServer NewGRPCServer,并指定给 Builder
  • 创建 GRPCClient NewGRPCClient,并指定给 Builder,且需要设置 ETCDBindingStorage,这个 Storage 是一个 Module,后面需要注册到 app
  • 首先需要知道,RPCServer 和 RPCClient 是成对出现的,如果你开启了一个 GRPCServer,那么对应使用的应该是 GRPCClient,所以上面行39我们指定了对应的 GRPCClient。

    GRPCServer 是对 RPCServer interface 的其中一个实现,另一个实现是默认的 NatsRPCServer,Client 端也是一样。

    默认的使用的 Nats 和需要手动创建一堆东西的 grpc 有什么不同呢,我们依次简单看看他们的实现

    grpc

    GRPCServer

    grpcServerConfig := config.NewDefaultGRPCServerConfig()
    grpcServerConfig.Port = rpcServerPort
    gs, err := cluster.NewGRPCServer(*grpcServerConfig, builder.Server, builder.MetricsReporters)
    if err != nil {
        panic(err)
    }
    builder.RPCServer = gs
    

    GRPCServerConfig 指定了 GRPCServer 要监听的端口,默认是 3434

    type GRPCServerConfig struct {
    	Port int `mapstructure:"port"`
    }
    

    GRPCServer 在创建的时候并没有开启监听,只是做了一些变量初始化,真正开启服务是在他的 Init 函数里,这个函数是最后 app.Start 时候调用的。实际上 RPCServer 会 Start 时先被包装成一个 ModuleWrapper,然后 startModules 统一调用了 Module 的初始化,这部分细节之后在看。这里先关注一下 GRPCServer 的定义:

    type GRPCServer struct {
        server           *Server				// 持有外层(顶层)的Server
    	port             int					// 监听端口
    	metricsReporters []metrics.Reporter		// 监控指标
    	grpcSv           *grpc.Server			// 实际开启的 grpc 服务
    	pitayaServer     protos.PitayaServer    // 即 RemoteService,用来处理服务器间通信
    }
    

    我们主要看其中的两个变量:

    • grpc.Server 是实际提供服务的 Server,来自于 google 的 grpc 包,GRPCServer 其实就是对他的封装
    • protos.PitayaServer 指向了一个 RemoteService,这个服务是 App 唯一的,用来处理与其他服务器间的通信

    再看一下 GRPC 的真正启动,是在 Init 里开启了一个 goroutine:

    func (gs *GRPCServer) Init() error {
    	port := gs.port
    	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
    	if err != nil {
    		return err
    	}
    	gs.grpcSv = grpc.NewServer()
    	protos.RegisterPitayaServer(gs.grpcSv, gs.pitayaServer)
    	go gs.grpcSv.Serve(lis)
    	return nil
    }
    

    行8调用的函数 RegisterPitayaServer 是在 pitaya.pb.go 里定义的(源文件 pitaya.proto 在 pitaya-protos 目录下,是 git 仓库的 submodule,如果是以压缩包的形式下载,或者拉取仓库后没有同步 submodule,则没有这个目录数据,可以手动下载:topfreegames/pitaya-protos: this repository contains the proto files for pitaya projects (github.com))。最终调用的是 grpc 的 RegisterService:

    // RegisterService registers a service and its implementation to the gRPC
    // server. It is called from the IDL generated code. This must be called before
    // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
    // ensure it implements sd.HandlerType.
    func (s *Server) RegisterService(sd *ServiceDesc, ss interface{})
    

    将服务及其实现注册到grpc服务器,且必须在 Serve 方法前调用,这是 grpc 的用法,有机会再研究吧,这里不再深入。

    GRPCClient

    Client 和 Server 一样,仍然需要自己创建,再指定给 builder:

    bs := modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewDefaultETCDBindingConfig())
    
    gc, err := cluster.NewGRPCClient(
        *config.NewDefaultGRPCClientConfig(),
        builder.Server,
        builder.MetricsReporters,
        bs,
        cluster.NewInfoRetriever(*config.NewDefaultInfoRetrieverConfig()),
    )
    if err != nil {
        panic(err)
    }
    builder.RPCClient = gc
    

    RPCClient 也是一个 Module,包装后由 startModules 统一调用初始化,GRPCClient Init 没有什么逻辑,主要是 NatsRPCClient 那边需要初始化,之后我们讲到了再看。

    还是看一下结构体定义:

    type GRPCClient struct {
    	bindingStorage   interfaces.BindingStorage	// 记录了用户绑定在哪个前端服务器中
    	clientMap        sync.Map					// 连接到不同 GRPCServer 的 grpcClient 字典,key 为 serverID
    	dialTimeout      time.Duration				// 建立连接的超时时间
    	infoRetriever    InfoRetriever				// 信息过滤,用来做 server 分区
    	lazy             bool						// 是否使用懒连接,即仅在向服务器发出请求时才连接服务器
    	metricsReporters []metrics.Reporter			// 监控指标
    	reqTimeout       time.Duration				// 进行 RPC 调用的请求超时时间
    	server           *Server					// 持有外层(顶层)的Server
    }
    
    clientMap

    一个字典,记录了所有的 grpcClient,不像 GRPCServer 是对一个 grpcServer 的封装,GRPCClient 是对一组 grpcClient 的管理。

    当一个新的 Server 被发现时(一个 Server 连接到了 etcd),有如下调用链:etcdServiceDiscovery.notifyListeners -> SDListener.AddServer, GRPCClient 就实现了 SDListener 接口。所以,一个新的 Server 加入后,有如下逻辑:

    func (gs *GRPCClient) AddServer(sv *Server) {
    	...
    	address := fmt.Sprintf("%s:%s", host, port)
    	client := &grpcClient{address: address}
    	if !gs.lazy {
    		if err := client.connect(); err != nil {
    			logger.Log.Errorf("[grpc client] unable to connect to server %s at %s: %v", sv.ID, address, err)
    		}
    	}
        gs.clientMap.Store(sv.ID, client)
        ...
    }
    

    即,创建一个新的 grpcClient 存储在线程安全的 clientMap 中,lazy 为 true 时则是懒连接,只有在向 Server 发出请求时才会连接。

    BindingStorage

    该字段来自 main 中创建的 ETCDBindingStorage,用来记录用户绑定在哪个前端服务器中,提供了接口 GetUserFrontendID,可以通过 uid 获取用户连接到的前端服务器 ID。

    ETCDBindingStorage 也是一个 Module,在 main 中被显式注册到 app 里:

    app.RegisterModule(bs, "bindingsStorage")
    

    遵循 Module 的规则,该模块实现了 Init 接口:

    func (b *ETCDBindingStorage) Init() error {
    	var cli *clientv3.Client
    	var err error
    	if b.cli == nil {
    		cli, err = clientv3.New(clientv3.Config{
    			Endpoints:   b.etcdEndpoints,
    			DialTimeout: b.etcdDialTimeout,
    		})
    		if err != nil {
    			return err
    		}
    		b.cli = cli
    	}
    	// namespaced etcd :)
    	b.cli.KV = namespace.NewKV(b.cli.KV, b.etcdPrefix)
    	err = b.bootstrapLease()
    	if err != nil {
    		return err
    	}
    
    	if b.thisServer.Frontend {
    		b.setupOnSessionCloseCB()
    		b.setupOnAfterSessionBindCB()
    	}
    
    	return nil
    }
    

    注意看一下行21,当这个服务是前端服务器的时候,设置了两个 Session Callback,用户连接上来时,将 uid-serverType:serverId 键值对存储到 etcd,当用户断开连接时,移除数据。所以接口 GetUserFrontendID 可以从 etcd 获取玩家连接到的前端服务器ID,使得 grpcClient 可以推送数据到对应的玩家。

    NatsRPC

    如果不显式指定,框架默认使用的是基于 Nats 实现的 rpc,这个 demo 里并没有使用到,既然我们已经看了 grpc,也顺便对比一下 NatsRPC 的实现,看看有何不同。

    既然是基于 Nats 的 rpc 实现,那我们先简单了解一下什么是 Nats

    Nats

    Nats 是 go 开发的开源、高效的消息中间件,支持 pub/sub 消息订阅、request/reply 消息回复等,其中对 replay 的支持就有了 rpc 的感觉。

    在第一篇的准备工作里,我们已经开启了 nats 服务,接下来看看 Pitaya 是如何使用 Nats 来实现 rpc 服务的。

    NatsRPCServer

    这个结构体定义相比较于前面的就很复杂了:

    type NatsRPCServer struct {
    	service                int					// 用来处理消息的 goroutine 数量
    	connString             string				// nats 服务器地址
    	connectionTimeout      time.Duration		// 与 nats 服务器建立连接的超时时间
    	maxReconnectionRetries int					// 与 nats 服务器重连的最大次数
    	server                 *Server				// 持有外层(顶层)的Server
    	conn                   *nats.Conn			// 与 nats 服务器建立的连接,即客户端
    	pushBufferSize         int					// 推送消息的缓冲区大小
    	messagesBufferSize     int					// 接受消息的缓冲区大小
    	stopChan               chan bool			// 用于实现优雅关闭
    	subChan                chan *nats.Msg 		// 接收消息的通道
    	bindingsChan           chan *nats.Msg 		// 接收通知:用户绑定到其他服务器时(绑定用户到session)
    	unhandledReqCh         chan *protos.Request	// 从 subChan 接收到的消息,处理解析成 protos.Request 放入这个管道,等待处理
    	responses              []*protos.Response	// 对 unhandledReqCh 遍历得到请求消息进行处理,结果作为回复消息放在这个切片
        											// 切片长度为 service,即有多少个 goroutine 处理消息,切片就有多长,每个元素对应一个 goroutine
    	requests               []*protos.Request	// 从 unhandledReqCh 遍历得到的请求,切片长度为 service,与 goroutine 的对应关系和 responses 一致
    	userPushCh             chan *protos.Push	// 要推送给玩家的消息写入这里
    	userKickCh             chan *protos.KickMsg	// 要踢玩家下线的消息写入这里
    	sub                    *nats.Subscription	// 对订阅topic的一个描述,可以获得该订阅的相关信息
    	dropped                int					// 该订阅已经丢弃的消息数
    	pitayaServer           protos.PitayaServer	// 传入的 RemoteService,用来处理服务器间通信
    	metricsReporters       []metrics.Reporter	// 监控指标
    	sessionPool            session.SessionPool	// 用户会话池,用于注册 OnSessionBind,当有新用户绑定 session 时,会调用到 NatsRPCServer.onSessionBind,
        											// onSessionBind 方法用于实现推送消息到用户,包括 Push 和 Kick,之前的 userPushCh 和 userKickCh 也是这个用处
    	appDieChan             chan bool			// 实现优雅关闭
    }
    

    这里只对每个变量做了简单注释,具体细节写得很巧妙,之后单独开一篇来学习(有空的话)。

    简单概括一下:

    服务器在 Nats 上 Subscribe 了一个 topic,格式为:pitaya/serer/{serverType}/{serverID},服务器只关心自己这个 ServerID 的主题。当这个主题有消息时会自动写入 subChan,有一个 goroutine 在遍历该管道,并解析成 protos.Reqeust 格式写到 unhandledReqCh。待处理的消息管道有多个消费者,初始化时就给他分配了 service 个 goroutine 来读取管道,读取到的请求写入 requests[threadID] 中,该切片的每个元素对应了一个 goroutine,避免了锁竞争。取出的请求交给逻辑层处理(pitayaServer.Call),回复数据存放在 responses[threadID],编码后 Publish 到 Nats,由对端处理。自此,就完成了 订阅 - 处理 - 发布 的流程,也就通过 pub/sub 实现了 RPC。

    NatsRPCClient

    RPC 的实现是成对出现的,有 Nats 实现的 Server 端,肯定还需要再对应一个 Client 端。

    type NatsRPCClient struct {
    	conn                   *nats.Conn			// 与 nats 服务器建立的连接,即客户端
    	connString             string				// nats 服务器地址
    	connectionTimeout      time.Duration		// 与 nats 服务器建立连接的超时时间
    	maxReconnectionRetries int					// 与 nats 服务器重连的最大次数
    	reqTimeout             time.Duration		// 请求超时时间
    	running                bool					// 是否在运行中
    	server                 *Server				// 持有外层(顶层)的Server
    	metricsReporters       []metrics.Reporter	// 监控指标
    	appDieChan             chan bool			// 实现优雅关闭
    }
    

    与 GRPCClient 不同,grpc 的实现要求客户端必须连接上所有的 GRPCServer,才能互相通信,所以 GRPCClient 是管理了一组面向 GRPCServer 的连接。

    而基于 Nats 实现的 RPC,只需要保持一个连接,无论是 Client 还是 Server 都是连接上 Nats 服务器,通过对某一主题的订阅来实现通信。

    关于通信,NatsPRCClient 提供了以下接口:

    BroadcastSessionBind(uid string) error
    
    Send(topic string, data []byte) error
    
    SendPush(userID string, frontendSv *Server, push *protos.Push) error
    
    SendKick(userID string, serverType string, kick *protos.KickMsg) error
    
    Call(ctx context.Context, rpcType protos.RPCType, route *route.Route, session session.Session, msg *message.Message, server *Server) (*protos.Response, error)
    

    前四个都只是推送消息,无需回复,只是简单的将数据 Publish 到某一主题,其他端订阅了这个主题就会处理这些消息。

    最后一个 Call 才是真正的 RPC:

    func (ns *NatsRPCClient) Call(
    	ctx context.Context,
    	rpcType protos.RPCType,
    	route *route.Route,
    	session session.Session,
    	msg *message.Message,
    	server *Server,
    ) (*protos.Response, error) {
        ...
        marshalledData, err := proto.Marshal(&req)
        ...
        m, err = ns.conn.Request(getChannel(server.Type, server.ID), marshalledData, ns.reqTimeout)
        ...
        res := &protos.Response{}
    	err = proto.Unmarshal(m.Data, res)
        ...
        return res, nil
    }
    

    Nats 虽然定位是一个消息中间件,但是也提供了 request/reply 模式,这里就是基于 Nats 提供的 Request API 实现了远程调用。

    异同

    值得一提的是 GRPC 并没有实现 Send 方法:

    image-20231012160718445.png

    Send 方法的声明如下:

    Send(route string, data []byte) error
    

    这个方法是发送消息到某一路由,NatsRPC 都连接到了 Nats 服务器上,可以通过订阅主题来实现。而 GRPC 是服务之间互相连接的,没有一个中心点来处理这种形式的广播。

    如果要发送消息到某一个玩家,可以调用 SendPush。再稍微看下他的实现:

    // GRPCClient
    func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error {
    	if frontendSv.ID != "" {
    		svID = frontendSv.ID
    	} else {
    		if gs.bindingStorage == nil {
    			return constants.ErrNoBindingStorageModule
    		}
    		svID, err = gs.bindingStorage.GetUserFrontendID(userID, frontendSv.Type)
    		if err != nil {
    			return err
    		}
    	}
    	if c, ok := gs.clientMap.Load(svID); ok {
    		ctxT, done := context.WithTimeout(context.Background(), gs.reqTimeout)
    		defer done()
    		err := c.(*grpcClient).pushToUser(ctxT, push)
    		return err
    	}
    	return constants.ErrNoConnectionToServer
    }
    
    // NatsRPCClient
    func (ns *NatsRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error {
    	topic := GetUserMessagesTopic(userID, frontendSv.Type)
    	msg, err := proto.Marshal(push)
    	if err != nil {
    		return err
    	}
    	return ns.Send(topic, msg)
    }
    

    代码比较简单啊,不一步步看了,直接说结论:

    不需要知道玩家具体在哪个前端服务器上,只需要提供 userID 和服务器类型,就可以自动定位到。grpc 是通过 BindgStorage 来实现的,Nats 更简单,反正直接推这个主题上去就行了,谁关心,谁消费。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论