1 背景
在分布式场景下,为了保障系统的可用性和数据的最终一致性,采用基于消息队列(MQ)的重试机制是一种常见的解决方案。伪代码如下:
/**
* 需要保证最终一致性的函数
*/
public void doSomething(Object args) {
try {
// 执行事务的操作
executeTransaction();
// 提交事务
commitTransaction();
} catch (Exception e) {
// 回滚事务
rollbackTransaction();
// 记录日志
log.error(e);
// 序列化参数
byte[] body = serialize(args);
// 构建消息, 指定Topic、Body
Message msg = new Message("doSomethingTopic", body);
// 发送失败重试消息
mq.send(msg);
}
}
/**
* 消费者,用于失败重试处理
*/
@Consumer(topic = "doSomethingTopic")
public void consume(Message msg) {
// 反序列化
Object args = msg.deserialize();
// 重试
doSomething(args);
}
在上述示例中,我们需要编写一系列与业务无关的代码来实现业务逻辑的重试机制。为了减轻开发人员的负担并让其专注于核心业务,我们可以对这些无关代码进行抽象和优化,以提高开发效率和代码质量。
2 方案
通过如下步骤,我们对重试逻辑进行了封装,开发人员只需要在需要保证最终一致性的函数上标注一个重试注解,便拥有基于MQ的分布式重试能力。
1. 使用注解与AOP: 通过使用注解与面向切面编程(AOP)的技术,将重试逻辑模块与业务代码解耦。开发人员可以在需要保证最终一致性的业务方法上添加注解,通过AOP将重试逻辑应用到目标方法中,从而自动触发重试机制。
2. 提供配置化选项:为重试逻辑提供可配置化的选项,例如设置最大重试次数、重试间隔时间等。这样,开发人员可以根据具体业务需求进行调整,而无需修改代码。
3. 异常处理和日志记录:在重试逻辑中合理地处理异常,并在必要时记录相关日志。这样可以帮助开发人员及时发现问题并进行排查。
4. 提供可视化监控工具:开发一个可视化的监控工具,用于实时跟踪重试操作和相关指标。这样可以帮助开发人员更好地理解重试的执行情况,并进行故障排查和性能优化。
图片
3 效果
我们引入了@MQRetry注解用于标记业务逻辑函数,一旦该函数发生异常,该注解会将服务名、类的完整名称、方法名称以及实际参数列表发送到消息队列(MQ)中。同时系统会注册一个MQ消费者来消费这些消息,并进行重试处理。
举个例子,假设我们有一个名为doSomething的函数,它包含了需要保证最终一致性执行的业务逻辑。仅需在该函数上添加@MQRetry注解,当函数出现异常时,框架会自动发送一条MQ重试消息。这条消息可以被当前服务的任意一台服务器消费,并重新执行doSomething函数。
@Service
class Service {
@MQRetry
public void doSomething(String params1, String params2, List params3) {
//throw new RuntimeException(); 抛异常将重试
//RetryContext.markRetryLater(); 标记为需要下次重试
//int retryCount = RetryContext.getRetryCount(); 获取重试次数
}
}
@Controller
class Controller {
@Autowired
private Service service;
service.doSomething("1", "2", Arrays.asList("3", "4"));
}
4 可选项
除此之外,我们还为开发人员提供了一些可选项,提供一些可配置的能力。
/**
* 基于MQ的分布式重试组件
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MQRetry {
/**
* 最大重试次数,默认与上限为16次
*/
int maxAttempts() default 16;
/**
* 忽略的异常类列表,默认所有异常都重试
*/
Class