一、基本介绍
1.springbatch 是什么?
Spring Batch 是一个用于批处理应用程序开发的开源框架。它提供了一套强大的工具和组件,用于处理大规模数据处理、ETL(抽取、转换和加载)操作、定时任务等。
Spring Batch 构建在 Spring Framework 的基础之上,利用了 Spring 的依赖注入、事务管理、异常处理等特性,使得批处理应用程序的开发更加简单和灵活。
2.springbatch 有哪些核心组件
①.JobLauncher(作业启动器)
用于启动和执行批处理作业的组件。通过 JobLauncher 可以调度和触发 Job 的执行,可以根据需要配置触发 Job 的方式,如定时触发、手动触发等。
②.Job(作业)
一个完整的批处理任务,包含多个 Step。Job 定义了整个批处理任务的执行顺序和依赖关系。可以通过 JobLauncher 启动和执行 Job。
③.Step(步骤)
批处理任务的一部分,包含了读取、处理和写入数据的逻辑。Step 由 ItemReader、ItemProcessor 和 ItemWriter 组成。Step 定义了读取器、处理器和写入器,并可以设置其他属性如事务管理、错误处理策略等。
④.ItemReader(数据读取器)
从数据源中读取数据的组件。常见的内置实现包括 JdbcCursorItemReader(从数据库读取数据)、FlatFileItemReader(从文件读取数据)、JmsItemReader(从消息队列读取数据)等。也可以自定义 ItemReader 来满足特定需求。
⑤.ItemProcessor(数据处理器)
对读取到的数据进行处理、转换或过滤的组件。可以进行任意的业务逻辑操作。可以根据需要添加一个或多个 ItemProcessor,并将它们链接在一起形成处理链。
⑤.ItemWriter(数据写入器):
将处理后的数据写入目标系统(比如数据库、文件等)的组件。常见的内置实现包括 JdbcBatchItemWriter(写入数据库)、FlatFileItemWriter(写入文件)、JmsItemWriter(写入消息队列)等。也可以自定义 ItemWriter 来实现特定的写入逻辑。
⑥.JobRepository(作业仓库)
用于管理批处理任务的元数据和状态信息。JobRepository 负责将作业执行的元数据存储到持久化存储介质(如数据库)中,并提供对这些元数据的操作接口。它还支持事务管理,保证批处理任务的一致性和可靠性。通过使用 JobRepository,可以实现断点续传、重试机制和错误处理等功能。
二、步骤概览
说明:这边实现的需求是,使用批处理组件,从文件中读取数据,经过批处理组件处理,最后保存至数据库中,步骤概览是基于该需求进行实现的。
三、步骤说明
1.引入依赖包
- 批处理依赖包:spring-boot-starter-batch
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<exclusions>
<!-- 排除内置的数据库 -->
<exclusion>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</exclusion>
</exclusions>
</dependency>
- mybatisplus 依赖包:mybatis-plus-boot-starter,引入它是由于我们数据处理后需要保存到业务库。
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
- mysql 驱动包:mysql-connector-java
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
2.配置集成参数
在 application.yml 配置文件中,定义集成spring batch 组件的配置参数。
spring:
batch:
job:
#设置为 false -需要jobLaucher.run执行
enabled: true
jdbc:
initialize-schema: always
3.启用批处理
在 springboot 启用类上添加注解 @EnableBatchProcessing 启用批处理。
4.配置作业步骤
①.数据读取
创建一个数据读取器,用于从文件中读取数据。
- Student:学生信息实体
@Data
@TableName(value = "sys_student")
public class Student implements Serializable {
@TableField(exist = false)
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
private String name;
private Integer age;
private Integer sex;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date birthday;
}
- /resources/static/student.txt:学生信息文件
"id","name","age","sex","birthday"
"1","Jin Xiaoming","10","1","2015-03-08 10:59:21"
"2","Yau Ka Fai","11","1","2005-08-21 01:13:52"
"3","Xu Zhiyuan","12","1","2006-04-12 04:01:10"
"4","Wu Hok Yau","20","1","2018-04-29 00:16:52"
"5","Shi Ziyi","20","1","2023-02-20 05:20:14"
"6","Koo Wing Fat","20","0","2012-09-13 06:47:16"
"7","Takeda Tsubasa","17","0","2002-05-03 15:31:37"
"8","Keith Jones","21","0","2009-02-24 03:56:21"
"9","Lillian Vargas","18","0","2001-03-02 00:19:56"
"10","Lai Ka Keung","29","0","2009-10-04 03:03:30"
"1149","Sakai Momoka","14","1","2013-10-09 00:00:00"
"1150","Han Jiehong","14","0","2021-07-17 00:00:00"
"1151","Yuan Xiaoming","15","0","2015-02-05 00:00:00"
"1152","Don Gonzalez","18","0","2008-08-31 00:00:00"
"1153","Chin Chieh Lun","15","0","2011-02-04 00:00:00"
- StudentFileItemReaderBuilder:学生信息文件读取器构造者,这边使用了组件内置的 FlatFileItemReader 文件读取器
@Component
public class StudentFileItemReaderBuilder {
public ItemReader<Student> build() {
FlatFileItemReader<Student> reader = new FlatFileItemReader<>();
// 设置文件资源地址
reader.setResource(new ClassPathResource("static/student.txt"));
// 忽略第一行
reader.setLinesToSkip(1);
// AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
// 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
// 设置属性名,类似于表头
tokenizer.setNames("id", "name", "age", "sex", "birthday");
// 将每行数据转换为TestData对象
DefaultLineMapper<Student> mapper = new DefaultLineMapper<>();
// 设置LineTokenizer
mapper.setLineTokenizer(tokenizer);
// 设置映射方式,即读取到的文本怎么转换为对应实体
mapper.setFieldSetMapper(fieldSet -> {
Student student = new Student();
student.setId(fieldSet.readLong("id"));
student.setName(fieldSet.readString("name"));
student.setAge(fieldSet.readInt("age"));
student.setSex(fieldSet.readInt("sex"));
student.setBirthday(fieldSet.readDate("birthday"));
return student;
});
reader.setLineMapper(mapper);
return reader;
}
}
②.数据处理
实现数据处理器,需要实现 ItemProcessor接口。
- StudentProcessor:学生信息处理器,用于对读取到的数据进行处理,案例中我们进行了简单的数据过滤处理。
@Component
public class StudentProcessor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student student) throws Exception {
// 将性别为1的都设置为0
if (student.getSex() == 1) {
student.setSex(0);
}
return student;
}
}
③.数据写入
实现数据写入器,需要实现ItemWriter接口,案例中使用集成的mybatisplus 批量到mysql中。
@Component
public class StudentItemWriter implements ItemWriter<Student> {
@Resource
private StudentMapper mapper;
@Override
public void write(List<? extends Student> list) throws Exception {
mapper.batchInsert(list);
}
}
5.配置作业
配置作业即定义Job 和对应的 Step,定义批处理任务的执行流程。
①.作业监听器
定义作业监听器,我们可以通过它监听作业执行开始和执行结束,并打印作业执行耗时。
- StudentReaderJobListener:作业监听器
@Slf4j
@Component
public class StudentReaderJobListener implements JobExecutionListener {
private LocalDateTime start;
private LocalDateTime end;
@Override
public void beforeJob(JobExecution jobExecution) {
start = LocalDateTime.now();
log.info("student reader job start...");
}
@Override
public void afterJob(JobExecution jobExecution) {
end = LocalDateTime.now();
log.info("student reader job end,cost {} ms", ChronoUnit.MILLIS.between(start, end));
}
}
②.定义作业
定义作业任务,设置其作业名称、监听器、作业步骤(由数据读取器、数据处理器、数据写入器组成)。
- StudentReaderJobBatchConfig:作业任务配置
@Slf4j
@Configuration
public class StudentReaderJobBatchConfig {
// 任务构建工厂
@Resource
private JobBuilderFactory jobFactory;
// 步骤构建工厂
@Resource
private StepBuilderFactory stepFactory;
// 读取构造器
@Resource
private StudentFileItemReaderBuilder readerBuilder;
// 信息处理器
@Resource
private StudentProcessor studentProcessor;
// 信息写入器
@Resource
private StudentItemWriter studentWriter;
// 任务监听器
@Resource
private StudentReaderJobListener jobListener;
@Bean
public Job studentReaderJob() {
return jobFactory.get("studentReaderJob")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.start(studentReaderJobStep())
.build();
}
//任务步骤
private Step studentReaderJobStep() {
return stepFactory.get("studentReaderJobStep")
.<Student, Student>chunk(10)
.reader(readerBuilder.build())
.processor(studentProcessor)
.writer(studentWriter)
.build();
}
}
四、代码测试
1. 测试代码
@SpringBootTest
public class ApiTest {
// 作业启动器
@Resource
public JobLauncher jobLauncher;
// 作业任务
@Resource
private Job studentReaderJob;
@Test
public void test_studentJob_execute() throws Exception {
// 启动参数,可按需设置
JobParameters jobParameters = new JobParametersBuilder()
//.addString("time", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(studentReaderJob, jobParameters);
}
}
2.测试结果
- 后台日志
- 数据入库结果
- 批处理执行记录表,集成组件后会在业务库中生成并记录作业信息。