grpc 的单向流和双向流

2023年 7月 14日 86.5k 0

简单模式

客户端发起一次请求,服务端响应一次数据

服务端数据流模式

客户端发起一次请求,服务端返回一段连续的数据流

proto 文件定义

proto 文件中定义 GetStream 方法,返回的是 stream 类型

rpc GetStream(StreamReqData) returns (stream StreamResData);

server 端实现

服务端使用 Send 不停的发送数据

代码如下:

func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
  i := 0
  for {
    _ = res.Send(&proto.StreamResData{
      Data: fmt.Sprintf("%v", time.Now().Unix()),
    })
    time.Sleep(time.Second)
    if i > 10 {
      break
    }
    i++
  }
  return nil
}

client 端实现

客户端使用 Recv 不停的接收数据

func serverSteam(c proto.GreeterClient) {
  res, _ := c.GetStream(context.Background(), &proto.StreamReqData{
    Data: "uccs",
  })
  for {
    data, err := res.Recv() // 服务端用 send 发送,客户端用 recv 接收
    if err != nil {
      fmt.Println(err)
      break
    }
    fmt.Println(data)
  }
}

客户端数据流模式

右客户端源源不断的发送数据流,在发送结束后,由服务端返回一个响应

proto 文件定义

proto 文件中定义 PutStream 方法,接收的是 stream 类型

rpc PutStream(stream StreamReqData) returns (StreamResData);

server 端实现

服务端使用 Recv 不停的接收数据

func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
  for {
    if data, err := cliStr.Recv(); err != nil {
      fmt.Println(err)
      break
    } else {
      fmt.Println(data)
    }
  }
  return nil
}

client 端实现

客户端使用 Send 不停的发送数据

func clientSteam(c proto.GreeterClient) {
  putRes, _ := c.PutStream(context.Background())
  i := 0
  for {
    i++
    _ = putRes.Send(&proto.StreamReqData{
      Data: "uccs",
    })
    time.Sleep(time.Second)
    if i > 10 {
      break
    }
  }
}

双向数据流模式

客户端和服务端可以向对方发送数据流,双方可以互相发送,也就是实时交互

proto 文件定义

proto 文件中定义 AllStream 方法,接收的是 stream 类型,返回的也是 stream 类型

rpc AllStream(stream StreamReqData) returns (stream StreamResData);

server 端实现

服务端使用 Recv 不停的接收数据,使用 Send 不停的发送数据,实现双向数据流

为了在接收数据时不阻塞,使用 goroutine

func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
  wg := sync.WaitGroup{}
  wg.Add(2)
  go func() {
    defer wg.Done()
    for {
      data, _ := allStr.Recv()
      fmt.Println("收到消息:" + data.Data)
    }
  }()
  go func() {
    defer wg.Done()
    for {
      _ = allStr.Send(&proto.StreamResData{Data: "我是服务器发来的数据"})
      time.Sleep(time.Second)
    }
  }()
  wg.Wait()
  return nil
}

client 端实现

客户端使用 Send 不停的发送数据,使用 Recv 不停的接收数据,实现双向数据流

为了在接收数据时不阻塞,使用 goroutine

func allSteam(c proto.GreeterClient) {
  allStr, _ := c.AllStream(context.Background())
  wg := sync.WaitGroup{}
  wg.Add(2)
  go func() {
    defer wg.Done()
    for {
      data, _ := allStr.Recv()
      fmt.Println("收到消息:" + data.Data)
    }
  }()
  go func() {
    defer wg.Done()
    for {
      _ = allStr.Send(&proto.StreamReqData{Data: "我是uccs"})
      time.Sleep(time.Second)
    }
  }()
  wg.Wait()
}

双向数据流模式、服务端数据流模式、客户端数据流模式 源码

往期文章

  • go 项目ORM、测试、api文档搭建
  • go 开发短网址服务笔记
  • go 实现统一加载资源的入口
  • go 语言编写简单的分布式系统
  • go 中 rpc 和 grpc 的使用
  • protocol 和 grpc 的基本使用
  • go 基础知识
  • 相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论