百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

Springboot官方例子--用Spring Batch处理批量作业

citgpt 2024-06-27 19:59 9 浏览 0 评论

继续给大家带来Springboot官方例子中文翻译版:本例将指导您完成创建批处理程序解决各种批量作业的过程。Spring Batch是一个专门处理批处理的子项目,是一个轻量级的大数据量的并行处理(批处理)的框架。作用和Hadoop很相似,不过Hadoop是基于重量级的分布式环境(处理巨量数据),而SpringBatch是基于轻量的应用框架(处理中小数据)。如果你只是业务需求不是很大的JAVA应用,可以参考用Spring Batch。本例您将构建一个从csv表格导入数据、转换数据并将最终结果存储在数据库中的服务。

程序结构

Springboot官方例子--用Spring Batch处理批量作业

└── src
 └── main
 └── java
 └── hello

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.springframework</groupId>
 <artifactId>gs-batch-processing</artifactId>
 <version>0.1.0</version>
 <parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.1.6.RELEASE</version>
 </parent>
 <properties>
 <java.version>1.8</java.version>
 </properties>
 <dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-batch</artifactId>
 </dependency>
 <dependency>
 <groupId>org.hsqldb</groupId>
 <artifactId>hsqldb</artifactId>
 </dependency>
 </dependencies>
 <build>
 <plugins>
 <plugin>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-maven-plugin</artifactId>
 </plugin>
 </plugins>
 </build>
</project>

Spring Boot将会你做如下的事:

  • 将 classpath 里面所有用到的jar包构建成一个可执行的 JAR 文件,方便执行你的程序
  • 搜索public static void main()方法并且将它当作可执行类
  • 根据springboot版本,去查找相应的依赖类版本,当然你可以定义其它版本。

业务数据

本次用到的业务数据如下:

src/main/resources/sample-data.csv

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

此电子表格在每一行中包含名字和姓氏,用逗号分隔

接下来,编写一个SQL脚本来创建一个表来存储数据。

src/main/resources/schema-all.sql

DROP TABLE people IF EXISTS;
CREATE TABLE people (
 person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
 first_name VARCHAR(20),
 last_name VARCHAR(20)
);

Springboot在启动期间自动运行schema-@@platform@@.sql。-all是所有平台的默认值。

创建业务类

现在您看到了数据输入和输出的格式,您可以编写代码来表示一行数据。

src/main/java/hello/Person.java

package hello;
public class Person {
 private String lastName;
 private String firstName;
 public Person() {
 }
 public Person(String firstName, String lastName) {
 this.firstName = firstName;
 this.lastName = lastName;
 }
 public void setFirstName(String firstName) {
 this.firstName = firstName;
 }
 public String getFirstName() {
 return firstName;
 }
 public String getLastName() {
 return lastName;
 }
 public void setLastName(String lastName) {
 this.lastName = lastName;
 }
 @Override
 public String toString() {
 return "firstName: " + firstName + ", lastName: " + lastName;
 }
}

创建中间处理类

在批处理中,一个常见的范例是接收数据,转换数据,然后将其导出到其他地方。在这里,您可以编写一个简单的转换程序,将名称转换为大写。

src/main/java/hello/PersonItemProcessor.java

package hello;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
 private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
 @Override
 public Person process(final Person person) throws Exception {
 final String firstName = person.getFirstName().toUpperCase();
 final String lastName = person.getLastName().toUpperCase();
 final Person transformedPerson = new Person(firstName, lastName);
 log.info("Converting (" + person + ") into (" + transformedPerson + ")");
 return transformedPerson;
 }
}

PersonItemProcessor实现了Spring Batch的ItemProcessor接口,这很方便处理批处理作业。根据该接口,您将接收一个传入的Person对象,然后将其转换为大写的Person。

将批处理作业合在一起

现在您将实际的批处理作业放在一起。Spring批处理提供了许多实用程序类,减少了编写自定义代码的需要。因此,您可以更关注业务逻辑。

src/main/java/hello/BatchConfiguration.java

package hello;

import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
 @Autowired
 public JobBuilderFactory jobBuilderFactory;
 @Autowired
 public StepBuilderFactory stepBuilderFactory;
 // tag::readerwriterprocessor[]
 @Bean
 public FlatFileItemReader<Person> reader() {
 return new FlatFileItemReaderBuilder<Person>()
 .name("personItemReader")
 .resource(new ClassPathResource("sample-data.csv"))
 .delimited()
 .names(new String[]{"firstName", "lastName"})
 .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
 setTargetType(Person.class);
 }})
 .build();
 }
 @Bean
 public PersonItemProcessor processor() {
 return new PersonItemProcessor();
 }
 @Bean
 public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
 return new JdbcBatchItemWriterBuilder<Person>()
 .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
 .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
 .dataSource(dataSource)
 .build();
 }
 // end::readerwriterprocessor[]
 // tag::jobstep[]
 @Bean
 public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
 return jobBuilderFactory.get("importUserJob")
 .incrementer(new RunIdIncrementer())
 .listener(listener)
 .flow(step1)
 .end()
 .build();
 }
 @Bean
 public Step step1(JdbcBatchItemWriter<Person> writer) {
 return stepBuilderFactory.get("step1")
 .<Person, Person> chunk(10)
 .reader(reader())
 .processor(processor())
 .writer(writer)
 .build();
 }
 // end::jobstep[]
}

对于初学者,@EnableBatchProcessing注释添加了许多支持批作业的关键bean,为您节省了大量的工作。此示例使用基于内存的数据库(由@EnableBatchProcessing提供),这意味着一旦运行结束,数据就消失了。

分解:

src/main/java/hello/BatchConfiguration.java

 @Bean
 public FlatFileItemReader<Person> reader() {
 return new FlatFileItemReaderBuilder<Person>()
 .name("personItemReader")
 .resource(new ClassPathResource("sample-data.csv"))
 .delimited()
 .names(new String[]{"firstName", "lastName"})
 .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
 setTargetType(Person.class);
 }})
 .build();
 }
 @Bean
 public PersonItemProcessor processor() {
 return new PersonItemProcessor();
 }
 @Bean
 public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
 return new JdbcBatchItemWriterBuilder<Person>()
 .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
 .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
 .dataSource(dataSource)
 .build();
 }

第一块代码定义输入、处理器和输出。

reader()创建一个 ItemReader。它查找一个名为sample-data.csv的文件,并用解析每个行项目,以将其转换为一个人。

processor()创建前面定义的PersonItemProcessor的实例,把数据转成。

write(datasource)创建一个 ItemWriter。这一个目标是JDBC目标,并自动获取由@EnableBatchProcessing创建的数据源的副本。它包括插入Person所需的SQL语句。

下一个块将重点放在实际的作业配置上。

src/main/java/hello/BatchConfiguration.java

 @Bean
 public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
 return jobBuilderFactory.get("importUserJob")
 .incrementer(new RunIdIncrementer())
 .listener(listener)
 .flow(step1)
 .end()
 .build();
 }
 @Bean
 public Step step1(JdbcBatchItemWriter<Person> writer) {
 return stepBuilderFactory.get("step1")
 .<Person, Person> chunk(10)
 .reader(reader())
 .processor(processor())
 .writer(writer)
 .build();
 }

第一个方法定义作业,第二个方法定义单个步骤。作业是从步骤构建的,每个步骤都可以涉及到读取、处理和写入。

在这个作业定义中,您需要一个增量器,因为作业使用数据库来维护执行状态。然后列出每个步骤,其中此作业只有一个步骤。作业结束,Java API构建一个完全配置的作业。

在步骤定义中,定义一次要写入的数据量。在这种情况下,它一次最多写入10条记录。接下来,使用前面的注入读取、处理和写入方法。

chunk()前缀为<person,person>,因为它是一个通用方法。这表示每个处理“ chunk”的输入和输出类型,并与ItemReader<Person>和ItemWriter<Person>对应。

src/main/java/hello/JobCompletionNotificationListener.java

package hello;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
 private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
 private final JdbcTemplate jdbcTemplate;
 @Autowired
 public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
 this.jdbcTemplate = jdbcTemplate;
 }
 @Override
 public void afterJob(JobExecution jobExecution) {
 if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
 log.info("!!! JOB FINISHED! Time to verify the results");
 jdbcTemplate.query("SELECT first_name, last_name FROM people",
 (rs, row) -> new Person(
 rs.getString(1),
 rs.getString(2))
 ).forEach(person -> log.info("Found <" + person + "> in the database."));
 }
 }
}

此代码侦听作业何时为BatchStatus.COMPLETED,然后使用JdbcTemplate 检查结果。

创建可执行Application

虽然批处理可以嵌入到Web应用程序和war文件中,但是下面演示的更简单的方法创建了一个独立的应用程序。在一个单一的、可执行的JAR文件中打包所有的东西,由一个main()方法驱动。

src/main/java/hello/Application.java

package hello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
 public static void main(String[] args) throws Exception {
 SpringApplication.run(Application.class, args);
 }
}

出于演示目的,有一些代码用于创建JDBCTemplate、查询数据库以及打印批处理作业插入的人员的姓名。

运行你的程序(STS下,Maven可参考前面文章)

这个程序中,每一个人都打印 一行,并且你可以从数据库中查到,输出结果如下;

、、、

Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)

Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)

Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)

Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)

Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)

Found in the database.

Found in the database.

Found in the database.

Found in the database.

Found in the database.

```

祝贺你!您构建了一个批处理作业,它从电子表格中接收数据,对其进行处理,并将其写入数据库。

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: