落花人独立, 微雨燕双飞
一、前言
上篇文档主要给大家介绍了下RocketMQ消费者相关内容,其中提到过消费者底层和Broker进行通信的时候,有两种模式,分别是同步消费和异步消费,不知道大家有没有过类似的疑问,netty底层本身是基于java nio的,按理说都是异步非阻塞的,那么RocketMQ是如何实现同步阻塞调用的呢?对于异步调用,如何区分response对应的请求呢?看完今天这篇文章,以上问题,你都将会了然于胸。
二、同步发送
消息消费的入口 org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
同步发送,则调用的是RemotingClient#invokeSync方法,最终调用的是NettyRemotingAbstract#invokeSyncImpl方法:
可以看到,阻塞是通过调用responseFuture.waitResponse实现的,查看其代码如下:
内部其实是通过countDownLatch.await方法实现的阻塞。最终拿到服务端的response。
三、异步发送
异步发送调用的NettyRemotingAbstract#invokeAsyncImpl方法:
可以看到,异步调用和同步调用方法类似,都是通过向channel注册ChannelFutureListener,通过实现operationComplete用来处理异步响应。不过这里可以看到,invokeAsyncImpl方法是没有返回值的,所以异步的结果是怎么拿到的呢?同时,如果netty客户端向服务发送了100次请求,服务端返回了100个response,客户端是如何能区分某个response对应哪个request呢?其实这两个问题都很经典,如果大家开发过netty相关应用想必都会碰到过。
- 问题一:如何拿到异步响应?
想必大家都知道,netty是通过channel来处理客户端和服务端请求的,查看客户端channel实现如下:
是通过调用processMessageReceived方法来处理服务端的响应,查看实现如下:
这里查看的是客户端实现,所以我们关注处理响应的processResponseCommand方法即可:
这里可以看到,每个cmd都会附带一个int类型Opaque字段,该字段即标记着客户端的请求,和服务端的对应关系,每个request都会附带Opaque字段,服务端处理完业务逻辑发送响应再把Opaque字段发给客户端;同时客户端通过维护opaque和responseFuture的对应关系(resonseFuture内部会维护对应的invokeCallBack信息),收到响应后,即可调用对应的callback方法返回对应的结果。
三、小节
本文内容不是很多,但是却都是很经典的问题,如果大家用netty开发过实际应用,想必都会碰到以上场景,如何实现同步和异步的调用?异步调用如何绑定请求和响应的对应关系,可以先收藏,后面如果有用到可以直接copy即可。