环境:Springboot2.4.12 + Spring Batch4.2.7
Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。
业务场景:
定期提交批处理。
并行批处理:作业的并行处理
分阶段、企业消息驱动的处理
大规模并行批处理
故障后手动或计划重新启动
相关步骤的顺序处理(扩展到工作流驱动的批处理)
部分处理:跳过记录(例如,回滚时)
整批事务,适用于小批量或现有存储过程/脚本的情况
技术目标:
- 批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。
- 基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。
- 提供通用的核心执行服务,作为所有项目都可以实现的接口。
- 提供可“开箱即用”的核心执行接口的简单和默认实现。
- 通过在所有层中利用spring框架,可以轻松配置、定制和扩展服务。
- 所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。
- 提供一个简单的部署模型,使用Maven构建的架构JAR与应用程序完全分离。
Spring Batch的结构:
图片
此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用SpringBatch编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。
下面介绍开发流程
本例完成 读取文件内容,经过处理后,将数据保存到数据库中
引入依赖
| |
| org.springframework.boot |
| spring-boot-starter-batch |
| |
| |
| org.springframework.boot |
| spring-boot-starter-web |
| |
| |
| org.springframework.boot |
| spring-boot-starter-data-jpa |
| |
| |
| mysql |
| mysql-connector-java |
| |
| |
| org.hibernate |
| hibernate-validator |
| 6.0.7.Final |
应用配置文件
| spring: |
| datasource: |
| driverClassName: com.mysql.cj.jdbc.Driver |
| url: jdbc:mysql://localhost:3306/batch?serverTimeznotallow=GMT%2B8 |
| username: root |
| password: ******* |
| type: com.zaxxer.hikari.HikariDataSource |
| hikari: |
| minimumIdle: 10 |
| maximumPoolSize: 200 |
| autoCommit: true |
| idleTimeout: 30000 |
| poolName: MasterDatabookHikariCP |
| maxLifetime: 1800000 |
| connectionTimeout: 30000 |
| connectionTestQuery: SELECT 1 |
| |
| spring: |
| jpa: |
| generateDdl: false |
| hibernate: |
| ddlAuto: update |
| openInView: true |
| show-sql: true |
| |
| spring: |
| batch: |
| job: |
| enabled: false #是否自动执行任务 |
| initialize-schema: always #自动为我们创建数据库脚本 |
开启批处理功能
| @Configuration |
| @EnableBatchProcessing |
| public class BatchConfig extends DefaultBatchConfigurer{ |
| } |
任务启动器
接着上一步的配置类BatchConfig重写对应方法
| @Override |
| protected JobLauncher createJobLauncher() throws Exception { |
| SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); |
| jobLauncher.setJobRepository(createJobRepository()); |
| jobLauncher.afterPropertiesSet(); |
| return jobLauncher; |
| } |
任务存储
接着上一步的配置类BatchConfig重写对应方法
| @Resource |
| private PlatformTransactionManager transactionManager ; |
| @Override |
| protected JobRepository createJobRepository() throws Exception { |
| JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); |
| factory.setDatabaseType("mysql"); |
| factory.setTransactionManager(transactionManager); |
| factory.setDataSource(dataSource); |
| factory.afterPropertiesSet(); |
| return factory.getObject(); |
| } |
定义JOB
| @Bean |
| public Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){ |
| return builder.get("myJob") |
| .incrementer(new RunIdIncrementer()) |
| .flow(step) |
| .end() |
| .listener(jobExecutionListener) |
| .build(); |
| } |
定义ItemReader读取器
| @Bean |
| public ItemReader reader(){ |
| FlatFileItemReader reader = new FlatFileItemReader(); |
| reader.setResource(new ClassPathResource("cvs/persons.cvs")); |
| reader.setLineMapper(new DefaultLineMapper() { |
| |
| { |
| setLineTokenizer(new DelimitedLineTokenizer(",") { |
| { |
| setNames("id", "name"); |
| } |
| }) ; |
| setFieldSetMapper(new BeanWrapperFieldSetMapper() { |
| { |
| setTargetType(Person.class) ; |
| } |
| }); |
| } |
| }); |
| return reader; |
| } |
定义ItemProcessor处理器
| @Bean |
| public ItemProcessor processorPerson(){ |
| return new ItemProcessor() { |
| @Override |
| public Person2 process(Person item) throws Exception { |
| Person2 p = new Person2() ; |
| p.setId(item.getId()) ; |
| p.setName(item.getName() + ", pk"); |
| return p ; |
| } |
| } ; |
| } |
定义ItemWriter写数据
| @Resource |
| private Validator validator ; |
| @Resource |
| private EntityManagerFactory entityManagerFactory ; |
| @Bean |
| public ItemWriter writerPerson(){ |
| JpaItemWriter writer = null ; |
| JpaItemWriterBuilder builder = new JpaItemWriterBuilder() ; |
| builder.entityManagerFactory(entityManagerFactory) ; |
| writer = builder.build() ; |
| return writer; |
| } |
定义Step
| @Bean |
| public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor){ |
| return stepBuilderFactory |
| .get("myStep") |
| .chunk(2) |
| .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2) |
| .listener(new MyReadListener()) |
| .processor(processor) |
| .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2) |
| .listener(new MyWriteListener()) |
| .build(); |
| } |
定义相应的监听器
| public class MyReadListener implements ItemReadListener { |
| |
| |
| private Logger logger = LoggerFactory.getLogger(MyReadListener.class); |
| |
| |
| @Override |
| public void beforeRead() { |
| } |
| |
| |
| @Override |
| public void afterRead(Person item) { |
| System.out.println("reader after: " + Thread.currentThread().getName()) ; |
| } |
| |
| |
| @Override |
| public void onReadError(Exception ex) { |
| logger.info("读取数据错误:{}", ex); |
| } |
| } |
| @Component |
| public class MyWriteListener implements ItemWriteListener { |
| private Logger logger = LoggerFactory.getLogger(MyWriteListener.class); |
| @Override |
| public void beforeWrite(List |
| |