百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

Spring Batch批处理实战

citgpt 2024-06-27 19:59 11 浏览 0 评论

学习使用 Spring batch从 CSV 文件中读取记录并使用JdbcBatchItemWriter插入数据库.我正在使用嵌入式数据库H2来演示此示例。

项目概况

在此应用程序中,我们将执行以下任务:

Spring Batch批处理实战

  1. 使用FlatFileItemReader读取 CSV 文件的员工记录
  2. 配置H2数据库并在其中创建表EMPLOYEE
  3. 使用JdbcBatchItemWriter将员工记录写入表EMPLOYEE
  4. 使用ItemProcessor将日志插入到数据库
  5. 使用控制台验证插入的记录H2

包结构

Maven Dependencies

快速浏览构建此示例所需的 maven 依赖项。 需要spring-boot-starter-web从浏览器窗口验证H2控制台中的数据。

<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd;
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.wisdom</groupId>
  <artifactId>App</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
 
  <name>App</name>
  <url>http://maven.apache.org</url>
 
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.3.RELEASE</version>
  </parent>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
 
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>com.h2database</groupId>
      <artifactId>h2</artifactId>
      <scope>runtime</scope>
    </dependency>
  </dependencies>
 
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
 
  <repositories>
    <repository>
      <id>repository.spring.release</id>
      <name>Spring GA Repository</name>
      <url>http://repo.spring.io/release</url>
    </repository>
  </repositories>
</project>

CSV 读取器和数据库编写器配置

  1. 我们将FlatFileItemReader用于读取 CSV 文件。我们将使用它的标准配置,包括DefaultLineMapper,DelimitedLineTokenizer和BeanWrapperFieldSetMapper类。
  2. 为了在数据库中写入记录,我们将使用JdbcBatchItemWriter标准编写器在数据库中执行 Spring batch作业的批处理查询。
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import com.wisdom.model.Employee;
 
@Configuration
@EnableBatchProcessing
public class BatchConfig {
   
  @Autowired
  private JobBuilderFactory jobBuilderFactory;
 
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
 
  @Value("classPath:/input/inputData.csv")
  private Resource inputResource;
 
  @Bean
  public Job readCSVFileJob() {
    return jobBuilderFactory
        .get("readCSVFileJob")
        .incrementer(new RunIdIncrementer())
        .start(step())
        .build();
  }
 
  @Bean
  public Step step() {
    return stepBuilderFactory
        .get("step")
        .<Employee, Employee>chunk(5)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
  }
   
  @Bean
    public ItemProcessor<Employee, Employee> processor() {
        return new DBLogProcessor();
    }
   
  @Bean
  public FlatFileItemReader<Employee> reader() {
    FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
    itemReader.setLineMapper(lineMapper());
    itemReader.setLinesToSkip(1);
    itemReader.setResource(inputResource);
    return itemReader;
  }
 
  @Bean
  public LineMapper<Employee> lineMapper() {
    DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
    DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
    lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
    BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
    fieldSetMapper.setTargetType(Employee.class);
    lineMapper.setLineTokenizer(lineTokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    return lineMapper;
  }
 
  @Bean
  public JdbcBatchItemWriter<Employee> writer() {
    JdbcBatchItemWriter<Employee> itemWriter = new JdbcBatchItemWriter<Employee>();
    itemWriter.setDataSource(dataSource());
    itemWriter.setSql("INSERT INTO EMPLOYEE (ID, FIRSTNAME, LASTNAME) VALUES (:id, :firstName, :lastName)");
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Employee>());
    return itemWriter;
  }
   
  @Bean
  public DataSource dataSource(){
    EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
    return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
        .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
        .addScript("classpath:employee.sql")
        .setType(EmbeddedDatabaseType.H2)
        .build();
  }
}

还要创建DBLogProcessor将在写入数据库之前记录员工记录,这是可选的。

import org.springframework.batch.item.ItemProcessor;
import com.howtodoinjava.demo.model.Employee;
 
public class DBLogProcessor implements ItemProcessor<Employee, Employee>
{
  public Employee process(Employee employee) throws Exception
  {
    System.out.println("Inserting employee : " + employee);
    return employee;
  }
}

Employee类

public class Employee {
 
  String id;
  String firstName;
  String lastName;
 
  //Setter and getter 方法
}

Application配置文件

#批处理job是否自动执行
spring.batch.job.enabled=false
spring.main.banner-mode=off
 
#批处理导入文件地址
input.dir=c:/temp/input

日志配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
 
  <appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <charset>UTF-8</charset>
      <Pattern>%d{yyyy-MM-dd HH:mm:ss} %p %X{TXNID} - %m%n</Pattern>
    </encoder>
  </appender>
 
  <root level="INFO">
    <appender-ref ref="consoleAppender" />
  </root>
</configuration>

配置 H2 数据库

已经在BatchConfig.java中配置了数据源。

@Bean
public DataSource dataSource(){
  EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
  return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
      .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
      .addScript("classpath:employee.sql")
      .setType(EmbeddedDatabaseType.H2)
      .build();
}

创建员工表

上述配置将自动生成默认表。要生成EMPLOYEE表,请创建建表语句文件employee.sql并放入resources文件夹。

DROP TABLE EMPLOYEE IF EXISTS;
 
CREATE TABLE EMPLOYEE  (
  ID VARCHAR(10),  
  FIRSTNAME VARCHAR(100),  
  LASTNAME VARCHAR(100) 
) ;

启用 H2 控制台

要启用 H2 控制台,请在 Spring web 注册org.h2.server.web.WebServlet。

import org.h2.server.web.WebServlet;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@SuppressWarnings({"rawtypes","unchecked"})
@Configuration
public class WebConfig {
   
  @Bean
    ServletRegistrationBean h2servletRegistration(){
    ServletRegistrationBean registrationBean = new ServletRegistrationBean( new WebServlet());
        registrationBean.addUrlMappings("/console/*");
        return registrationBean;
    }
}

演示

我们的应用程序配置已完成,作业已准备好执行。让我们创建输入 CSV 文件。

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
5,David,Walsh

运行演示

要运行演示和批处理作业,请创建 Spring boot应用程序类并启动应用程序。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
 
@SpringBootApplication
@EnableScheduling
public class App {
    @Autowired
    JobLauncher jobLauncher;
      
    @Autowired
    Job job;
      
    public static void main(String[] args)    {
        SpringApplication.run(App.class, args);
    }
      
    @Scheduled(cron = "0 */1 * * * ?")
    public void perform() throws Exception    {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        jobLauncher.run(job, params);
    }
}

验证批处理作业结果

要验证批处理作业是否成功执行,请检查日志和 H2 控制台。

2023-06-29 23:11:00 INFO  - Job: [SimpleJob: [name=readCSVFileJob]] launched with the following parameters: [{JobID=1531316460004}]
 
2023-07-29 23:11:00 INFO  - Executing step: [step]
 
Inserting employee : Employee [id=1, firstName=Lokesh, lastName=Gupta]
Inserting employee : Employee [id=2, firstName=Amit, lastName=Mishra]
Inserting employee : Employee [id=3, firstName=Pankaj, lastName=Kumar]
Inserting employee : Employee [id=4, firstName=David, lastName=Miller]
Inserting employee : Employee [id=5, firstName=David, lastName=Walsh]
 
2023-06-29 23:11:00 INFO  - Job: [SimpleJob: [name=readCSVFileJob]] completed with the following parameters: [{JobID=1531316460004}] and the following status: [COMPLETED]

H2 Console



相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: