这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
最近在使用RocketMQ的重置消费位点的时候经常出现报错,所以就打算研究下RocketMQ
是如何重置消费者的消费位点的
RocketMQ版本
- 5.1.0
- Dashboard: 最新github master分支代码
源码分析
dashboard
功能入口
该功能主要是在RocketMQ
的dashboard
页面
我们直接使用跳过堆积按钮,看调用的哪个接口
通过接口请求我们可以看到调用的是skipAccumulate.do
接口
然后我们简单看看参数
{
"resetTime": -1,
"consumerGroupList": [
"gid-xiao-zou-topic"
],
"topic": "xiao-zou-topic",
"force": true
}
可以看到传入了一个gid
,一个topic
,还有一个force
为true
force这个参数我们后面进行源码分析再说
我们直接全局搜索找到这个接口
可以看到是传统的MVC架构,controller
-service
-serviceImpl
我们这里直接去看看他的实现类
ConsumerServiceImpl
可以看到核心方法是调用org.apache.rocketmq.tools.admin.MQAdminExt
的resetOffsetByTimestamp
方法
Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
这里可以看到这几个参数我们就是我们之前接口请求的json都有的。
topic
和group
我们都知道,时间戳传入的有点特殊,这里传入的是一个-1
,isForce
传入的是true
isForce
就是表示强制是否强制重置消费进度
当force
参数为true
时,无论消费者当前的消费进度是否比指定的时间戳早,都会将消费进度重置为指定时间戳对应的消息。
当force
参数为false
时,只有当消费者当前的消费进度比指定的时间戳早时,才会将消费进度重置为指定时间戳对应的消息。
RocketMQ
这里我们找到了入口我们直接回到RocketMQ源码我们去看看
org.apache.rocketmq.tools.admin.MQAdminExt
的resetOffsetByTimestamp
方法
public Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List brokerDatas = topicRouteData.getBrokerDatas();
Map allOffsetTable = new HashMap();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
Map offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
if (offsetTable != null) {
allOffsetTable.putAll(offsetTable);
}
}
}
}
return allOffsetTable;
}
这里代码我们分析下
topic
的元数据topicRouteData
topic
的元数据topicRouteData
获取到topic
所在的broker
信息List brokerDatas
broker
发送重置消费位点请求步骤一和二不是我们的分析重点
我们重点看看3的源代码
public Map invokeBrokerToResetOffset(final String addr, final String topic, final String group,
final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
throws RemotingException, MQClientException, InterruptedException {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timestamp);
requestHeader.setForce(isForce);
// offset is -1 means offset is null
requestHeader.setOffset(-1L);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
if (isC) {
request.setLanguage(LanguageCode.CPP);
}
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
return body.getOffsetTable();
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
实际的逻辑肯定都封装在broker的,所以我们直接通过请求码INVOKE_BROKER_TO_RESET_OFFSET
找到对应的broker的逻辑代码
- AdminBrokerProcessor
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
LOGGER.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce());
if (this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
String topic = requestHeader.getTopic();
String group = requestHeader.getGroup();
int queueId = requestHeader.getQueueId();
long timestamp = requestHeader.getTimestamp();
Long offset = requestHeader.getOffset();
return resetOffsetInner(topic, group, queueId, timestamp, offset);
}
boolean isC = false;
LanguageCode language = request.getLanguage();
switch (language) {
case CPP:
isC = true;
break;
}
return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
this.brokerController.getBroker2Client().resetOffset
resetOffset
这个方法的实现有点长,我们慢慢看
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 获取topic的元数据信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
// 如果topic为空则报错
if (null == topicConfig) {
log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Map offsetTable = new HashMap();
// 循环
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
// 查询当前消费偏移量
long consumerOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
// 如果当前偏移量为-1表示消费进度不存在
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group not exist", group));
return response;
}
long timeStampOffset;
// 如果传入的参数时间戳为-1 则表示要跳过堆积,直接更新消费位点为最新的,否则获取指定时间的消费位点
if (timeStamp == -1) {
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
// 如果查询到的偏移量不正常将要重置的消费位点设置为0
if (timeStampOffset < 0) {
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
// 如果开启强制重置消费位点则直接重置
// 否则需要消费位点小于当前消费的消费位点,避免丢失消息
if (isForce || timeStampOffset = MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={} ,error={}",
topic, group, e.toString());
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. channel={}, version={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo =
String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
requestHeader.getGroup(),
requestHeader.getTopic(),
requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}
可以看到是获取服务端的消费位点,然后设置消费位点,然后通过发送请求通知所有client
,通知他们修改他们本地的消费位点.所以这里重置消费位点失败有几种情况
这里需要注意即使消费者有消息堆积,消费者没有连接到broker
,也是会重置消费位点失败的。
这里重置消费位点实际还是还是通知所有client
用心的消费位点去broker
拉去消息。不是去修改broker
的消费位点
我们来看看最终通知client
的处理逻辑
网络请求码是220
public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
- ClientRemotingProcessor
this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
public synchronized void resetOffset(String topic, String group, Map offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
log.info("[reset-offset] consumer dose not exist. group={}", group);
return;
}
consumer.suspend();
ConcurrentMap processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry entry : processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
Iterator iterator = processQueueTable.keySet().iterator();
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
consumer.updateConsumeOffset(mq, offset);
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
iterator.remove();
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
}
}
}
} finally {
if (consumer != null) {
consumer.resume();
}
}
}
总的处理流程如下:
获取到需要重置消费位点的消费者,然后暂停消费
获取消费者的消息处理队列表processQueueTable
,遍历 processQueueTable
中的条目,对于满足条件 topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)
的消息队列 mq,执行以下操作
a. 将对应的消息处理队列 pq 设置为已丢弃状态,即 pq.setDropped(true)
b. 清空消息处理队列 pq,即 pq.clear()。
线程休眠10s
遍历processQueueTable
,对于满足条件topic.equals(mq.getTopic()) && offset != null
的消息队列 mq,执行以下操作:
a. 从 offsetTable
中获取对应消息队列 mq 的消费位点 offset
。
b. 尝试更新消费者的消费位点为offset
,即 consumer.updateConsumeOffset(mq, offset)
c. 从消费者的消息队列重新平衡实现中移除不必要的消息队列,即 consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq))
d. 使用迭代器的 remove()
方法从processQueueTable
中移除当前的消息队列mq
恢复消费
总结
总的来说RocketMQ
的消费位点如果是客户端管理,重置消费位点是由客户端发起,发送到broker
,最终还是由broker
去通知所有broker
去更新本地消费位点