Seata源码(一)初始化

2023年 10月 12日 58.7k 0

Java极客  |  作者  /  铿然一叶
这是Java极客的第 92 篇原创文章

相关阅读:

萌新快速成长之路
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(五)事件通知模式解耦过程
Java编程思想(六)事件通知模式解耦过程
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃
HikariPool源码(二)设计思想借鉴
【极客源码】JetCache源码(一)开篇
【极客源码】JetCache源码(二)顶层视图
人在职场(一)IT大厂生存法则

1. 核心操作

1.初始化GlobalTransactionScanner

2.GlobalTransactionScanner识别启用全局事务的类,添加不同拦截器

3.初始化TM客户端和RM客户端

4.添加destroy钩子

2. 核心类结构

image.png

3. 自动装配

通过spring的SPI机制,自动实例化GlobalTransactionScanner。

引入不同的库使用的自动装配类不同,

groupId artifactId 规范
io.seata seata-spring-boot-starter SeataAutoConfiguration
com.alibaba.cloud spring-cloud-alibaba-seata GlobalTransactionAutoConfiguration

3.1 seata-spring-boot-starter

3.1.1 入口

SeataAutoConfiguration.java

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
}

3.1.2 SPI配置

在seata-spring-boot-starter模块的spring.factories文件中配置了SeataAutoConfiguration类使能自动装配。

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
io.seata.spring.boot.autoconfigure.SeataPropertiesAutoConfiguration,
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration

3.1.3 SeataProperties参数初始化

GlobalTransactionScanner的构造器参数seataProperties:

GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);

SeataPropertiesAutoConfiguration.java

AutoConfigureBefore注解配置在SeataAutoConfiguration之前完成初始化。

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@AutoConfigureBefore({SeataAutoConfiguration.class, SeataDataSourceAutoConfiguration.class})
public class SeataPropertiesAutoConfiguration {
    static {

        PROPERTY_BEAN_MAP.put(SEATA_PREFIX, SeataProperties.class);

3.2 spring-cloud-alibaba-seata

3.2.1 入口

GlobalTransactionAutoConfiguration.java

一步到位把SeataProperties也初始化了,但是少了FailureHandler的初始化:

@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class GlobalTransactionAutoConfiguration {
    private final ApplicationContext applicationContext;
    private final SeataProperties seataProperties;

    public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext, SeataProperties seataProperties) {
        this.applicationContext = applicationContext;
        this.seataProperties = seataProperties;
    }

    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
        String txServiceGroup = this.seataProperties.getTxServiceGroup();
        if (StringUtils.isEmpty(txServiceGroup)) {
            txServiceGroup = applicationName + "-fescar-service-group";
            this.seataProperties.setTxServiceGroup(txServiceGroup);
        }

        return new GlobalTransactionScanner(applicationName, txServiceGroup);
    }
}

3.2.2 SPI配置

在spring-cloud-alibaba-seata模块的spring.factories文件中配置了GlobalTransactionAutoConfiguration类使能自动装配。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.alibaba.cloud.seata.rest.SeataRestTemplateAutoConfiguration,
com.alibaba.cloud.seata.web.SeataHandlerInterceptorConfiguration,
com.alibaba.cloud.seata.GlobalTransactionAutoConfiguration,
com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration,
com.alibaba.cloud.seata.feign.hystrix.SeataHystrixAutoConfiguration

4. 核心类GlobalTransactionScanner

主要作用:

1.实现InitializingBean接口完成bean初始化操作

2.实现ConfigurationChangeListener接口监听seata配置参数变化并做处理

3.实现DisposableBean接口做destroy处理

4.实现ApplicationContextAware接口用户注入ApplicationContext

5.继承AbstractAutoProxyCreator类,重载其方法,添加事务处理拦截器

4.1 初始化操作

1.注册参数变更监听器

2.初始化TM和RM

    @Override
    public void afterPropertiesSet() {
        // 来自service.disableGlobalTransaction配置参数
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            // 添加service.disableGlobalTransaction参数变更监听器,也就是它自己,如果参数变化则会动态初始化TM客户端和RM客户端
            ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                    (ConfigurationChangeListener)this);

            // 如果service.disableGlobalTransaction=false则不会初始化TM客户端和RM客户端
            return;
        }
        // 没有初始化则初始化(TM客户端和RM客户端)
        if (initialized.compareAndSet(false, true)) {
            initClient();
        }
    }

4.2 监听参数变化

监听参数为不启用全局事务变化到启用全局事务,变化后初始化RM和TM。

    @Override
    public void onChangeEvent(ConfigurationChangeEvent event) {
        if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
            disableGlobalTransaction = Boolean.parseBoolean(event.getNewValue().trim());
            // 监听service.disableGlobalTransaction参数配置,如果启用全局事务并且没有做过初始化则做初始化
            if (!disableGlobalTransaction && initialized.compareAndSet(false, true)) {
                LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        disableGlobalTransaction, event.getNewValue());
                initClient();
            }
        }
    }

4.3 添加拦截器

4.3.1 拦截器实例化

根据条件添加TCC模式和AT模式拦截器:

    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)interceptor);
                } else {
                    Class serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }

                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        // 监听配置参数service.disableGlobalTransaction的变化
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

4.3.2 判断事务注解

GlobalTransactional支持类和方法级,GlobalLock仅支持方法级。

    private boolean existsAnnotation(Class[] classes) {
        if (CollectionUtils.isNotEmpty(classes)) {
            for (Class clazz : classes) {
                if (clazz == null) {
                    continue;
                }
                // 类上的GlobalTransactional注解
                GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
                if (trxAnno != null) {
                    return true;
                }
                Method[] methods = clazz.getMethods();
                for (Method method : methods) {
                    // 方法上GlobalTransactional注解
                    trxAnno = method.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }

                    // 方法上GlobalLock注解
                    GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                    if (lockAnno != null) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

4.3.3 返回拦截器

实现AbstractAutoProxyCreator类的方法返回拦截器:

    @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)
            throws BeansException {
        return new Object[]{interceptor};
    }

4.4 添加destory操作

4.4.1 实现DisposableBean接口

实现DisposableBean接口,调用ShutdownHook做destory操作:

    @Override
    public void destroy() {
        // 统一委托给ShutdownHook做destory,虽然它自己也可以做
        ShutdownHook.getInstance().destroyAll();
    }

4.4.2 注册shutdownhook

在init时注册shutdownhook

    private void registerSpringShutdownHook() {
        if (applicationContext instanceof ConfigurableApplicationContext) {
            ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
            // 这里移除了ShutdownHook注册的java虚拟机的shutdownhook
            ShutdownHook.removeRuntimeShutdownHook();
        }
        // TM客户端和RM客户端的父类实现了Disposable的destory方法,ShutdownHook遍历调用所有Disposable实现类的destory方法
        ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
        ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
    }

end.

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论