基于SpringBoot使用Spring Batch批处理框架,处理大数据新方案

环境:Springboot2.4.12 + Spring Batch4.2.7

Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

业务场景:

  • 定期提交批处理。
  • 并行批处理:作业的并行处理
  • 分阶段、企业消息驱动的处理
  • 大规模并行批处理
  • 故障后手动或计划重新启动
  • 相关步骤的顺序处理(扩展到工作流驱动的批处理)
  • 部分处理:跳过记录(例如,回滚时)
  • 整批事务,适用于小批量或现有存储过程/脚本的情况
  • 技术目标:

    • 批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。
    • 基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。
    • 提供通用的核心执行服务,作为所有项目都可以实现的接口。
    • 提供可“开箱即用”的核心执行接口的简单和默认实现。
    • 通过在所有层中利用spring框架,可以轻松配置、定制和扩展服务。
    • 所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。
    • 提供一个简单的部署模型,使用Maven构建的架构JAR与应用程序完全分离。

    Spring Batch的结构:

    图片图片

    此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用SpringBatch编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。

    下面介绍开发流程

    本例完成 读取文件内容,经过处理后,将数据保存到数据库中

    引入依赖

    org.springframework.boot
    spring-boot-starter-batch
    org.springframework.boot
    spring-boot-starter-web
    org.springframework.boot
    spring-boot-starter-data-jpa
    mysql
    mysql-connector-java
    org.hibernate
    hibernate-validator
    6.0.7.Final

    应用配置文件

    spring:
    datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/batch?serverTimeznotallow=GMT%2B8
    username: root
    password: *******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
    minimumIdle: 10
    maximumPoolSize: 200
    autoCommit: true
    idleTimeout: 30000
    poolName: MasterDatabookHikariCP
    maxLifetime: 1800000
    connectionTimeout: 30000
    connectionTestQuery: SELECT 1
    ---
    spring:
    jpa:
    generateDdl: false
    hibernate:
    ddlAuto: update
    openInView: true
    show-sql: true
    ---
    spring:
    batch:
    job:
    enabled: false #是否自动执行任务
    initialize-schema: always #自动为我们创建数据库脚本

    开启批处理功能

    @Configuration
    @EnableBatchProcessing
    public class BatchConfig extends DefaultBatchConfigurer{
    }

    任务启动器

    接着上一步的配置类BatchConfig重写对应方法

    @Override
    protected JobLauncher createJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(createJobRepository());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
    }

    任务存储

    接着上一步的配置类BatchConfig重写对应方法

    @Resource
    private PlatformTransactionManager transactionManager ;
    @Override
    protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDatabaseType("mysql");
    factory.setTransactionManager(transactionManager);
    factory.setDataSource(dataSource);
    factory.afterPropertiesSet();
    return factory.getObject();
    }

    定义JOB

    @Bean
    public Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){
    return builder.get("myJob")
    .incrementer(new RunIdIncrementer())
    .flow(step)
    .end()
    .listener(jobExecutionListener)
    .build();
    }

    定义ItemReader读取器

    @Bean
    public ItemReader reader(){
    FlatFileItemReader reader = new FlatFileItemReader();
    reader.setResource(new ClassPathResource("cvs/persons.cvs"));
    reader.setLineMapper(new DefaultLineMapper() {
    // 代码块
    {
    setLineTokenizer(new DelimitedLineTokenizer(",") {
    {
    setNames("id", "name");
    }
    }) ;
    setFieldSetMapper(new BeanWrapperFieldSetMapper() {
    {
    setTargetType(Person.class) ;
    }
    });
    }
    });
    return reader;
    }

    定义ItemProcessor处理器

    @Bean
    public ItemProcessor processorPerson(){
    return new ItemProcessor() {
    @Override
    public Person2 process(Person item) throws Exception {
    Person2 p = new Person2() ;
    p.setId(item.getId()) ;
    p.setName(item.getName() + ", pk");
    return p ;
    }
    } ;
    }

    定义ItemWriter写数据

    @Resource
    private Validator validator ;
    @Resource
    private EntityManagerFactory entityManagerFactory ;
    @Bean
    public ItemWriter writerPerson(){
    JpaItemWriter writer = null ;
    JpaItemWriterBuilder builder = new JpaItemWriterBuilder() ;
    builder.entityManagerFactory(entityManagerFactory) ;
    writer = builder.build() ;
    return writer;
    }

    定义Step

    @Bean
    public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor){
    return stepBuilderFactory
    .get("myStep")
    .chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)
    .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
    .listener(new MyReadListener())
    .processor(processor)
    .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
    .listener(new MyWriteListener())
    .build();
    }

    定义相应的监听器

    public class MyReadListener implements ItemReadListener {
    private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
    @Override
    public void beforeRead() {
    }
    @Override
    public void afterRead(Person item) {
    System.out.println("reader after: " + Thread.currentThread().getName()) ;
    }
    @Override
    public void onReadError(Exception ex) {
    logger.info("读取数据错误:{}", ex);
    }
    }
    @Component
    public class MyWriteListener implements ItemWriteListener {
    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
    @Override
    public void beforeWrite(List