继续给大家带来Springboot官方例子中文翻译版:本例将指导您完成创建批处理程序解决各种批量作业的过程。Spring Batch是一个专门处理批处理的子项目,是一个轻量级的大数据量的并行处理(批处理)的框架。作用和Hadoop很相似,不过Hadoop是基于重量级的分布式环境(处理巨量数据),而SpringBatch是基于轻量的应用框架(处理中小数据)。如果你只是业务需求不是很大的JAVA应用,可以参考用Spring Batch。本例您将构建一个从csv表格导入数据、转换数据并将最终结果存储在数据库中的服务。
程序结构
└── src └── main └── java └── hello
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.springframework</groupId> <artifactId>gs-batch-processing</artifactId> <version>0.1.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.hsqldb</groupId> <artifactId>hsqldb</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Spring Boot将会你做如下的事:
- 将 classpath 里面所有用到的jar包构建成一个可执行的 JAR 文件,方便执行你的程序
- 搜索public static void main()方法并且将它当作可执行类
- 根据springboot版本,去查找相应的依赖类版本,当然你可以定义其它版本。
业务数据
本次用到的业务数据如下:
src/main/resources/sample-data.csv
Jill,Doe Joe,Doe Justin,Doe Jane,Doe John,Doe
此电子表格在每一行中包含名字和姓氏,用逗号分隔
接下来,编写一个SQL脚本来创建一个表来存储数据。
src/main/resources/schema-all.sql
DROP TABLE people IF EXISTS; CREATE TABLE people ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20) );
Springboot在启动期间自动运行schema-@@platform@@.sql。-all是所有平台的默认值。
创建业务类
现在您看到了数据输入和输出的格式,您可以编写代码来表示一行数据。
src/main/java/hello/Person.java
package hello; public class Person { private String lastName; private String firstName; public Person() { } public Person(String firstName, String lastName) { this.firstName = firstName; this.lastName = lastName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; } }
创建中间处理类
在批处理中,一个常见的范例是接收数据,转换数据,然后将其导出到其他地方。在这里,您可以编写一个简单的转换程序,将名称转换为大写。
src/main/java/hello/PersonItemProcessor.java
package hello; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; public class PersonItemProcessor implements ItemProcessor<Person, Person> { private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class); @Override public Person process(final Person person) throws Exception { final String firstName = person.getFirstName().toUpperCase(); final String lastName = person.getLastName().toUpperCase(); final Person transformedPerson = new Person(firstName, lastName); log.info("Converting (" + person + ") into (" + transformedPerson + ")"); return transformedPerson; } }
PersonItemProcessor实现了Spring Batch的ItemProcessor接口,这很方便处理批处理作业。根据该接口,您将接收一个传入的Person对象,然后将其转换为大写的Person。
将批处理作业合在一起
现在您将实际的批处理作业放在一起。Spring批处理提供了许多实用程序类,减少了编写自定义代码的需要。因此,您可以更关注业务逻辑。
src/main/java/hello/BatchConfiguration.java
package hello;
import javax.sql.DataSource; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.core.JdbcTemplate; @Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; // tag::readerwriterprocessor[] @Bean public FlatFileItemReader<Person> reader() { return new FlatFileItemReaderBuilder<Person>() .name("personItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() .names(new String[]{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}) .build(); } @Bean public PersonItemProcessor processor() { return new PersonItemProcessor(); } @Bean public JdbcBatchItemWriter<Person> writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Person>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } // end::readerwriterprocessor[] // tag::jobstep[] @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter<Person> writer) { return stepBuilderFactory.get("step1") .<Person, Person> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); } // end::jobstep[] }
对于初学者,@EnableBatchProcessing注释添加了许多支持批作业的关键bean,为您节省了大量的工作。此示例使用基于内存的数据库(由@EnableBatchProcessing提供),这意味着一旦运行结束,数据就消失了。
分解:
src/main/java/hello/BatchConfiguration.java
@Bean public FlatFileItemReader<Person> reader() { return new FlatFileItemReaderBuilder<Person>() .name("personItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() .names(new String[]{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}) .build(); } @Bean public PersonItemProcessor processor() { return new PersonItemProcessor(); } @Bean public JdbcBatchItemWriter<Person> writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Person>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); }
第一块代码定义输入、处理器和输出。
reader()创建一个 ItemReader。它查找一个名为sample-data.csv的文件,并用解析每个行项目,以将其转换为一个人。
processor()创建前面定义的PersonItemProcessor的实例,把数据转成。
write(datasource)创建一个 ItemWriter。这一个目标是JDBC目标,并自动获取由@EnableBatchProcessing创建的数据源的副本。它包括插入Person所需的SQL语句。
下一个块将重点放在实际的作业配置上。
src/main/java/hello/BatchConfiguration.java
@Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter<Person> writer) { return stepBuilderFactory.get("step1") .<Person, Person> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); }
第一个方法定义作业,第二个方法定义单个步骤。作业是从步骤构建的,每个步骤都可以涉及到读取、处理和写入。
在这个作业定义中,您需要一个增量器,因为作业使用数据库来维护执行状态。然后列出每个步骤,其中此作业只有一个步骤。作业结束,Java API构建一个完全配置的作业。
在步骤定义中,定义一次要写入的数据量。在这种情况下,它一次最多写入10条记录。接下来,使用前面的注入读取、处理和写入方法。
chunk()前缀为<person,person>,因为它是一个通用方法。这表示每个处理“ chunk”的输入和输出类型,并与ItemReader<Person>和ItemWriter<Person>对应。
src/main/java/hello/JobCompletionNotificationListener.java
package hello; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { if(jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("!!! JOB FINISHED! Time to verify the results"); jdbcTemplate.query("SELECT first_name, last_name FROM people", (rs, row) -> new Person( rs.getString(1), rs.getString(2)) ).forEach(person -> log.info("Found <" + person + "> in the database.")); } } }
此代码侦听作业何时为BatchStatus.COMPLETED,然后使用JdbcTemplate 检查结果。
创建可执行Application
虽然批处理可以嵌入到Web应用程序和war文件中,但是下面演示的更简单的方法创建了一个独立的应用程序。在一个单一的、可执行的JAR文件中打包所有的东西,由一个main()方法驱动。
src/main/java/hello/Application.java
package hello; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); } }
出于演示目的,有一些代码用于创建JDBCTemplate、查询数据库以及打印批处理作业插入的人员的姓名。
运行你的程序(STS下,Maven可参考前面文章)
这个程序中,每一个人都打印 一行,并且你可以从数据库中查到,输出结果如下;
、、、
Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)
Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)
Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)
Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)
Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)
Found in the database.
Found in the database.
Found in the database.
Found in the database.
Found in the database.
```
祝贺你!您构建了一个批处理作业,它从电子表格中接收数据,对其进行处理,并将其写入数据库。