概述
Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重试,我们可以选择手动管理,并在成功的情况下增加偏移量。但是,这会暂时阻止队列消息的处理。我们可以选择异步方法。
为什么我们需要它?
如果发生错误,而不是停止队列消息的处理;我们可以将错误消息转移到不同的主题并再次处理。
如果在处理 Kafka 消息时出现错误,可以使用 RetryableTopic 注解以一定的时间间隔和一定的次数再次处理消息。如果完成尝试次数后错误仍然存在,则消息将发送到 DLT 队列。
如何使用?
我们首先回顾一下RetryableTopic注解可以取的一些值,以便您可以做出最适合您的设置:
attempts:尝试处理消息的次数。它的默认值为 3。如果完成所有尝试后仍然收到错误,则消息将发送到 DLT 队列。
backoff:用于确定处理消息的时间间隔。从 Backoff 类获取一个值。您可以在下面找到退避的详细示例。
排除/排除名称:允许您排除指定的异常类。当您添加到列表中的任何错误被抛出时,重试机制将不会被激活。
include / includeNames:仅当抛出指定的异常时才会激活重试机制。
kafkaTemplate:虽然您可以给出现有 kafkaTemplate bean 的名称,但您也可以为特定于重试的 Kafka 模板定义不同的 bean。
autoCreateTopics:决定是否自动创建Retry和DLT主题。
retryTopicSuffix / dltTopicSuffix:用于确定要添加到自动创建的主题末尾的后缀。
dltStrategy:如果不需要DLT,可以定义为NO_DLT。
SameIntervalTopicReuseStrategy/fixedDelayTopicStrategy(3.0.4之前):用于确定要创建的重试主题策略。创建 (SINGLE_TOPIC) 或尽可能多的尝试值 (MULTIPLE_TOPICS) 重试主题。
Backoff的示例:
- 具有固定的增量值
Backoff(delay = 600000 ) // 每 10 分钟
- 具有指数价值
Backoff(delay = 60000 , multiplier = 2 ) // 1、2、4、8... 分钟后重复。
- 用占位符定义值
Backoff(delayExpression = "${delay}", multiplierExpression = "${multiplier}")
@RetryableTopic 示例:
@RetryableTopic(
backoff = @Backoff(delay = 300000),
attempts = 12,
sameIntervalTopicReuseStrategy =
SameIntervalTopicReuseStrategy.SINGLE_TOPIC,
kafkaTemplate = "kafkaRetryableTopicTemplate",
exclude = { SerializationException.class,
DeserializationException.class,
NullPointerException.class
}
)
@KafkaListener(topics = "my-topic")
public void processMessage(RetryableDto retryableDto) {
log.info("Retrying process RetryableDto : {}", retryableDto);
// process message
}
在上面的例子中,消息将每5分钟重新处理一次,总共12次,即1小时。如果任何尝试均顺利完成,则试用将终止。
由于定义了 SINGLE_TOPIC,因此将创建单个主题以进行重试。如果没有进行此定义,则会创建 12 个重试主题。
如果抛出了排除中定义的任何错误,则不会执行重做。
如果需要,您可以编写自己的 RetryableException 并在包含中定义此值,以便仅在引发此错误时才重试。
DLT队列处理
如果完成了定义的尝试次数并且继续收到错误,则消息将发送到 DLT 队列。如果要处理这些消息,可以使用DltHandler注解。
用法示例:
@DltHandler
public void handleDltMessage (RetryableDto retryableDto) {
log.error("DLT处理程序消息:{}", retryableDto);
}
注意事项
虽然使用 RetryableTopic 的异步处理优势为我们带来了性能提升,但这种使用也有一些缺点。
使用RetryableTopic可能会破坏消息的处理顺序。
让我们用一个例子来解释这种情况:当主主题在时间 t 处理时,一条消息出错并被发送到重试主题。在时间 t + 1 时,另一条消息来到主主题并成功处理。让我们在重试主题中的消息在时间 t + 2 时被成功处理。在这种情况下,第一条传入消息将在第二条消息之后处理。如果订购对您很重要,我建议您在消息处理过程中进行必要的检查。
另一个缺点是消息双重处理的风险。您可以通过考虑这种可能性来进行改进。