SpringTxAsync组件是仓储平台组(WMS6)自主研发的一个专门用于解决可靠异步调用问题的组件。
通过使用SpringTxAsync组件,我们成功地解决了在仓储平台(WMS6)中的异步调用需求。经过近二年多的实践并经历了两次618活动以及两次双11活动,该组件已经在我们的所有应用中稳定运行并成功应用于各种业务场景。
该组件的主要功能是实现可靠的异步调用。在异步任务的执行过程中,我们能够确保任务的可靠性,即使在出现异常情况或重要机器重启等不确定因素时,仍能保证任务的正常执行,并且能够满足我们的业务需求。
SpringTxAsync组件的成功应用为我们的仓储平台(WMS6)提供了稳定可靠的异步调用支持。在接下来的内容中,我们将详细介绍该组件的设计原理和技术特点,以帮助读者更好地理解和应用该组件。
异步调用的场景
异步的本质就是一种Fire-And-Forget模式,它在编程中常用于两种场景的应用:
异步编程模式的使用可以带来多种好处,例如提升系统性能、改善用户体验、增强系统的稳定性和可扩展性等。
简单举例说明一下:
用户注册完成后,需要发送注册成功的邮件通知。
// transaction begin
User user = UserService.create(UserCreateRequest reqeust)//本地事务
NotificationService.sendRegisterEmail(user)// 本地调用
// do other business operations
//transaction commit
//NotificationService.sendRegisterEmail(user) 实现
@TransactionAsync
public void sendRegisterEmail(User user){
EmailService.send(...);//远程调用
}
可选技术方案
类似如下场景:
- 用户下单成功后给用户发一个email
- 用户注册成功后,发布一个事件(Message)到消息中件间
- 在DDD开发模式下中处理聚合根之间的领域事件通知
整体示意图如下:
我们的核心需求
异步任务的可靠性,与本地事务的一致性。
可靠性
可靠性是指在创建完成异步任务并成功提交本地事务后,确保异步任务一定会被执行,无论是否出现异常情况或机器重启等意外因素,始终能够保证达到At-Least-Once语义。
一致性
一致性指的是异步方法内部业务逻辑与调用异步方法时的事务上下文保持一致。当本地事务提交时,异步任务执行;而当本地事务回滚时,异步任务则不会执行。
基于以上两点需求,我们做了能满足上述场景的OutboxPattern实现。
SpringTxAsync方案
核心逻辑
为了对使用@TransactionAsync标注为异步的方法进行拦截,我们可以采用AOP方式。
在拦截过程中,将Invocation封装成一个异步任务,并将其持久化到数据库的异步任务表中,该操作需要在当前事务域内完成。
对于已经保存在本地表中的异步任务的处理,我们可以考虑两种实现方案。
方案1
首先,异步任务表被视为只读表,只会插入任务记录,不会进行修改操作。对于任务的处理,我们可以通过监听表的binlog来实现。当收到insert事件时,将其转换成消息队列(mq)消息,在应用中有一个监听器(listener)负责消费这些消息。
方案2
为了有效追踪异步任务的生命周期,我们对异步任务进行状态化管理。
当有新任务生成时,我们将其插入到异步任务表中。在当前事务提交后,任务会立即提交到线程池中执行,而不是从数据库中获取任务。只有发生异常的异步任务,才会定时从数据库中获取任务来处理。 这种方案可以确保异步任务能够及时地被提交和执行,同时在异常情况下也能够保证任务的处理。
两种方案的取舍:我们选择第二种方案
- 减少依赖的中间件,降低组件的整体复杂度以及团队接入组件后运维成本
- 降低任务数据的调用链长度,从binlog到mq这中间增加了很多不确定性
异步任务状态机
异步任务的生命周期共有五个状态:READY(就绪)、RUNNING(运行中)、EXCEPTION(异常)、SUCCESS(成功)、FAIL(失败)
核心代码
异步方法注解,用来声明异步任务
/**
* 标记在Spring 容器管理的bean的public方法上,以实现本地事务级别的可靠异步调用。
*
* 对于加注了该注解的方法:
*
1. 如果当前处理事务中,则对异步方法的调用是在事务提交之后进行的,这样是为了保证异步调用与本地事务的一致性,
* 假如事务回滚,则异步调用不会执行。
*
2. 如果当前调用不在事务中,则可靠性退化为与{@link org.springframework.scheduling.annotation.Async}一致。
*
*
对于异步逻辑的调用,能保证At Least Once的语义。
*
在极端场景下,比如down机,或线程池被打爆的情况下,是通过重度来保证调用的可靠性的,所以有可能对异步逻辑的执行大于一次,所以要求异步逻辑本身是幂等的。
*
*
注意:在同一个bean内部调用时不起作用,这是因为Spring AOP的Proxy代理机制导致。
*
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TransactionAsync {
public static int UN_DEFINED = -1;
/***
* -1 as not set. default 5 min
* @return timeout value, in seconds
*
*/
long timeout() default UN_DEFINED;
/**
* 用来指定当前异步任务执行的线程池,这里指定线程池的Bean名称,运行时会从Spring容器中根据名称查询。
* 如果未指定,则使用一个默认线程池,全局共享一个。
* 不同的异步任务优先级高低不同,如果想要隔离,可以给不同任务指定不同的线程池。
* 注意:如果要自定义线程池,要使用Spring的ThreadPoolTaskExecutor,这个线程池的实现,支持运行时调整线程数。
* @return executor bean name
*/
String executor() default "";
/**
* -1 as not set. default 3 times
*
* @return max retry attempt
*/
int maxAttempt() default UN_DEFINED;
}
拦截@TransactionAsync,在AOP中做切面逻辑。伪代码如下:
TransactionAsync.AOP.invoke(MethodInvocation) {
AsyncInvocation asyncInvocation = 根据当前方法以及注解里的属性(支持SpringEL)确定落库的各个字段值
wms_async_task.insert(asyncInvocation, status=READY) // 当前事务里塞入一个insert,如果无事务auto_commit
ThreadPoolTaskExecutor executor = determineAsyncExecutor(method) // 根据方法获取异步执行的线程池
if isInTransaction() {
// 注册事务提交后的hook
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
void afterCommit() { // 仍然在当前业务线程里,但事务外
executor.submit(asyncInvocation) // 转为异步线程,原事务数据变化可见
}
})
} else {
executor.submit(asyncInvocation) // redo log writen, invoke task best effort
}
}
方案完备性
错误重试
为了处理异步任务发生异常的情况,我们引入了重试机制。默认情况下,异步任务将进行3次重试,并且可以在注解中自定义重试次数。重试的间隔采用指数级递增的方式,直到达到最大重试间隔300秒后,将以固定间隔进行重试。当任务达到最大重试次数后,任务将被标记为DeadTask,同时触发报警并需要人工介入处理。
重试机制的实现依赖于Spring框架的本地调度器@Schedule。我们通过@Schedule注解定义一个定时任务,该任务会从异步任务表中根据任务的状态获取任务,并调用相应的处理方法。需要注意的是,使用乐观锁来管理任务的状态,以避免多个本地Spring Schedule任务执行相同的失败任务。
通过以上的重试机制,我们能够及时处理异步任务发生异常的情况,并尽可能地进行重试,以提高任务的成功率。在达到最大重试次数后,及时报警并进行人工介入,以确保任务的及时处理和系统的稳定运行。
超时保护
有三种类型的超时:等待超时、执行超时和失败超时。
队列管理
一旦异步任务被创建并持久化到数据库,当创建异步任务所在的事务提交后,异步任务会立即提交给线程池进行处理。由于仓储平台的大部分应用都是长时间运行的任务,所以我们默认配置了较大的执行线程池,并使用阻塞队列作为缓冲。 线程池默认采用ThreadPoolTaskExecutor来执行任务,队列使用LinkedBlockingQueue来实现有界队列。当任务提交到队列无法进入时,采用丢弃策略(DiscardPolicy)并触发报警。需要注意的是,被丢弃的任务并不会真正丢失,而是会被当做等待超时的处理方式。
隔离性
通过使用线程池来实现任务的隔离是一种有效的方法。在声明异步任务时,可以在注解中明确指定所使用的线程池,以实现任务的隔离。举例来说,对于不同的优先级任务,可以使用不同的线程池来处理;而对于不同特征的任务,则可以配置不同的线程池;另外,还可以将短时任务与长时任务分别隔离开,分别放入不同的线程池中,以保证任务能够得到有效地处理。
伸缩性
在某些情况下,一个节点可能会产生大量的异步任务,而这些任务只能在该节点上执行,无法分配给其他节点执行。这会导致该节点负载过重,而其他节点却处于闲置状态。
为了解决这个问题,我们需要合理设计异步任务的粒度。如果一个节点产生的异步任务过于庞大,可以考虑将其拆分为更小的任务,并分配给其他节点执行。这样做的好处是,能够有效地平衡各个节点的负载,提高系统的性能和可伸缩性。
此外,还需要考虑创建异步任务本身的请求是否能够进行负载均衡。如果一个节点的异步任务请求过于频繁,可以通过一些负载均衡的策略来分担请求压力,例如使用负载均衡器或者分布式任务调度器来进行任务的分发和调度。
因此,在设计异步任务时,需要综合考虑任务粒度和请求负载均衡两个方面,以实现合理的任务分配和节点负载均衡,从而保证整个系统的稳定性和可伸缩性。
可配置项
提供了大量的可选配置项以满足不同的业务场景
核心配置选项如下:
- 默认最大重试次数
- 默认的失败重试间隔
- 默认执行超时时间
- 默认异步任务线程池相关核心参数
扩展点
通过采用SPI(Service Provider Interface)方式,系统提供了一些可扩展的接入点,以增强异步任务执行过程中的灵活性。这些接入点可以根据具体业务场景,自定义增加相应的逻辑。 以下是几个重要的扩展点和其作用:
通过以上提供的扩展点,系统提供了灵活性和扩展性,便于根据业务特定需求进行定制化开发。使用者可以根据自身业务场景和需求,选择相应的扩展点,并编写逻辑来满足具体的业务要求。这种SPI扩展机制使得系统更具可定制性和代码的可维护性。
管理API
- 任务的查询
- 任务的重置
监控点
通过UMP(京东自研监控平台) SDK上报到UMP平台已经内置的监控如下所示,还可以通过预留的SPI增加自定义监控。
- 任务执行异常监控
- 线程池拒绝线程监控
- 任务积压监控
- 组件内部异常监控
参考资料
OutboxPattern www.kamilgrzybek.com/design/the-…
作者:京东物流 张涛
来源:京东云开发者社区 自猿其说Tech 转载请注明来源