基于SpringBoot使用Spring Batch批处理框架,处理大数据新方案

2023年 8月 22日 46.5k 0

环境:Springboot2.4.12 + Spring Batch4.2.7

Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

业务场景:

  • 定期提交批处理。
  • 并行批处理:作业的并行处理
  • 分阶段、企业消息驱动的处理
  • 大规模并行批处理
  • 故障后手动或计划重新启动
  • 相关步骤的顺序处理(扩展到工作流驱动的批处理)
  • 部分处理:跳过记录(例如,回滚时)
  • 整批事务,适用于小批量或现有存储过程/脚本的情况
  • 技术目标:

    • 批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。
    • 基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。
    • 提供通用的核心执行服务,作为所有项目都可以实现的接口。
    • 提供可“开箱即用”的核心执行接口的简单和默认实现。
    • 通过在所有层中利用spring框架,可以轻松配置、定制和扩展服务。
    • 所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。
    • 提供一个简单的部署模型,使用Maven构建的架构JAR与应用程序完全分离。

    Spring Batch的结构:

    图片图片

    此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用SpringBatch编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。

    下面介绍开发流程

    本例完成 读取文件内容,经过处理后,将数据保存到数据库中

    引入依赖

    
      org.springframework.boot
      spring-boot-starter-batch
    
    
      org.springframework.boot
      spring-boot-starter-web
    
    
      org.springframework.boot
      spring-boot-starter-data-jpa
    
    
      mysql
      mysql-connector-java
    
    
      org.hibernate
      hibernate-validator
      6.0.7.Final
    

    应用配置文件

    spring:
      datasource:
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/batch?serverTimeznotallow=GMT%2B8
        username: root
        password: *******
        type: com.zaxxer.hikari.HikariDataSource
        hikari:
          minimumIdle: 10
          maximumPoolSize: 200
          autoCommit: true
          idleTimeout: 30000
          poolName: MasterDatabookHikariCP
          maxLifetime: 1800000
          connectionTimeout: 30000
          connectionTestQuery: SELECT 1
    ---
    spring:
      jpa:
        generateDdl: false
        hibernate:
          ddlAuto: update
        openInView: true
        show-sql: true
    ---
    spring:
      batch:
        job:
          enabled: false #是否自动执行任务
        initialize-schema: always  #自动为我们创建数据库脚本

    开启批处理功能

    @Configuration
    @EnableBatchProcessing
    public class BatchConfig extends DefaultBatchConfigurer{
    }

    任务启动器

    接着上一步的配置类BatchConfig重写对应方法

    @Override
    protected JobLauncher createJobLauncher() throws Exception {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(createJobRepository());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;
    }

    任务存储

    接着上一步的配置类BatchConfig重写对应方法

    @Resource
    private PlatformTransactionManager transactionManager ;
    @Override
    protected JobRepository createJobRepository() throws Exception {
      JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
      factory.setDatabaseType("mysql");
      factory.setTransactionManager(transactionManager);
      factory.setDataSource(dataSource);
      factory.afterPropertiesSet();
      return factory.getObject();
    }

    定义JOB

    @Bean
    public Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){
      return builder.get("myJob")
          .incrementer(new RunIdIncrementer())
          .flow(step)
          .end()
          .listener(jobExecutionListener)
          .build();
    }

    定义ItemReader读取器

    @Bean
    public ItemReader reader(){
      FlatFileItemReader reader = new FlatFileItemReader();
      reader.setResource(new ClassPathResource("cvs/persons.cvs"));
      reader.setLineMapper(new DefaultLineMapper() {
        // 代码块
        {
          setLineTokenizer(new DelimitedLineTokenizer(",") {
            {
              setNames("id", "name");
            }
          }) ;
          setFieldSetMapper(new BeanWrapperFieldSetMapper() {
            {
              setTargetType(Person.class) ;
            }
          });
        }
      });
      return reader;
    }

    定义ItemProcessor处理器

    @Bean
    public ItemProcessor processorPerson(){
      return new ItemProcessor() {
          @Override
          public Person2 process(Person item) throws Exception {
            Person2 p = new Person2() ;
            p.setId(item.getId()) ;
            p.setName(item.getName() + ", pk");
            return p ;
          }
      } ;
    }

    定义ItemWriter写数据

    @Resource
    private Validator validator ;
    @Resource
    private EntityManagerFactory entityManagerFactory ;
    @Bean
    public ItemWriter writerPerson(){
      JpaItemWriter writer = null ;
      JpaItemWriterBuilder builder = new JpaItemWriterBuilder() ;
      builder.entityManagerFactory(entityManagerFactory) ;
      writer = builder.build() ;
      return writer;
    }

    定义Step

    @Bean
    public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor){
      return stepBuilderFactory
              .get("myStep")
              .chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)
              .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
              .listener(new MyReadListener())
              .processor(processor)
              .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
              .listener(new MyWriteListener())
              .build();
    }

    定义相应的监听器

    public class MyReadListener implements ItemReadListener {
    
    
      private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
    
    
      @Override
      public void beforeRead() {
      }
    
    
      @Override
      public void afterRead(Person item) {
        System.out.println("reader after: " + Thread.currentThread().getName()) ;
      }
    
    
      @Override
      public void onReadError(Exception ex) {
        logger.info("读取数据错误:{}", ex);
      }
    }
    @Component
    public class MyWriteListener implements ItemWriteListener {

    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);

    @Override
    public void beforeWrite(List

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论