百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

springboot-如何集成springbatch实现批处理

citgpt 2024-06-27 19:58 8 浏览 0 评论

一、基本介绍

1.springbatch 是什么?

Spring Batch 是一个用于批处理应用程序开发的开源框架。它提供了一套强大的工具和组件,用于处理大规模数据处理、ETL(抽取、转换和加载)操作、定时任务等。

Spring Batch 构建在 Spring Framework 的基础之上,利用了 Spring 的依赖注入、事务管理、异常处理等特性,使得批处理应用程序的开发更加简单和灵活。

springboot-如何集成springbatch实现批处理

2.springbatch 有哪些核心组件

①.JobLauncher(作业启动器)

用于启动和执行批处理作业的组件。通过 JobLauncher 可以调度和触发 Job 的执行,可以根据需要配置触发 Job 的方式,如定时触发、手动触发等。

②.Job(作业)

一个完整的批处理任务,包含多个 Step。Job 定义了整个批处理任务的执行顺序和依赖关系。可以通过 JobLauncher 启动和执行 Job。

③.Step(步骤)

批处理任务的一部分,包含了读取、处理和写入数据的逻辑。Step 由 ItemReader、ItemProcessor 和 ItemWriter 组成。Step 定义了读取器、处理器和写入器,并可以设置其他属性如事务管理、错误处理策略等。

④.ItemReader(数据读取器)

从数据源中读取数据的组件。常见的内置实现包括 JdbcCursorItemReader(从数据库读取数据)、FlatFileItemReader(从文件读取数据)、JmsItemReader(从消息队列读取数据)等。也可以自定义 ItemReader 来满足特定需求。

⑤.ItemProcessor(数据处理器)

对读取到的数据进行处理、转换或过滤的组件。可以进行任意的业务逻辑操作。可以根据需要添加一个或多个 ItemProcessor,并将它们链接在一起形成处理链。

⑤.ItemWriter(数据写入器)

将处理后的数据写入目标系统(比如数据库、文件等)的组件。常见的内置实现包括 JdbcBatchItemWriter(写入数据库)、FlatFileItemWriter(写入文件)、JmsItemWriter(写入消息队列)等。也可以自定义 ItemWriter 来实现特定的写入逻辑。

⑥.JobRepository(作业仓库)

用于管理批处理任务的元数据和状态信息。JobRepository 负责将作业执行的元数据存储到持久化存储介质(如数据库)中,并提供对这些元数据的操作接口。它还支持事务管理,保证批处理任务的一致性和可靠性。通过使用 JobRepository,可以实现断点续传、重试机制和错误处理等功能。

二、步骤概览

说明:这边实现的需求是,使用批处理组件,从文件中读取数据,经过批处理组件处理,最后保存至数据库中,步骤概览是基于该需求进行实现的。


三、步骤说明

1.引入依赖包

  • 批处理依赖包:spring-boot-starter-batch
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
  <exclusions>
  <!-- 排除内置的数据库 -->
    <exclusion>
      <groupId>com.h2database</groupId>
     <artifactId>h2</artifactId>
    </exclusion>
 </exclusions>
</dependency>
  • mybatisplus 依赖包:mybatis-plus-boot-starter,引入它是由于我们数据处理后需要保存到业务库。
<dependency>
  <groupId>com.baomidou</groupId>
  <artifactId>mybatis-plus-boot-starter</artifactId>
  <version>3.5.5</version>
</dependency>
  • mysql 驱动包:mysql-connector-java
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

2.配置集成参数

在 application.yml 配置文件中,定义集成spring batch 组件的配置参数。

spring:
  batch:
    job:
      #设置为 false -需要jobLaucher.run执行
      enabled: true
    jdbc:
      initialize-schema: always

3.启用批处理

在 springboot 启用类上添加注解 @EnableBatchProcessing 启用批处理。

4.配置作业步骤

①.数据读取

创建一个数据读取器,用于从文件中读取数据。

  • Student:学生信息实体
@Data
@TableName(value = "sys_student")
public class Student implements Serializable {
    @TableField(exist = false)
    private static final long serialVersionUID = 1L;
    @TableId(type = IdType.AUTO)
    private Long id;
    private String name;
    private Integer age;
    private Integer sex;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date birthday;
}
  • /resources/static/student.txt:学生信息文件
"id","name","age","sex","birthday"
"1","Jin Xiaoming","10","1","2015-03-08 10:59:21"
"2","Yau Ka Fai","11","1","2005-08-21 01:13:52"
"3","Xu Zhiyuan","12","1","2006-04-12 04:01:10"
"4","Wu Hok Yau","20","1","2018-04-29 00:16:52"
"5","Shi Ziyi","20","1","2023-02-20 05:20:14"
"6","Koo Wing Fat","20","0","2012-09-13 06:47:16"
"7","Takeda Tsubasa","17","0","2002-05-03 15:31:37"
"8","Keith Jones","21","0","2009-02-24 03:56:21"
"9","Lillian Vargas","18","0","2001-03-02 00:19:56"
"10","Lai Ka Keung","29","0","2009-10-04 03:03:30"
"1149","Sakai Momoka","14","1","2013-10-09 00:00:00"
"1150","Han Jiehong","14","0","2021-07-17 00:00:00"
"1151","Yuan Xiaoming","15","0","2015-02-05 00:00:00"
"1152","Don Gonzalez","18","0","2008-08-31 00:00:00"
"1153","Chin Chieh Lun","15","0","2011-02-04 00:00:00"
  • StudentFileItemReaderBuilder:学生信息文件读取器构造者,这边使用了组件内置的 FlatFileItemReader 文件读取器
@Component
public class StudentFileItemReaderBuilder {

    public ItemReader<Student> build() {
        FlatFileItemReader<Student> reader = new FlatFileItemReader<>();
        // 设置文件资源地址
        reader.setResource(new ClassPathResource("static/student.txt"));

        // 忽略第一行
        reader.setLinesToSkip(1);

        // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
        // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

        // 设置属性名,类似于表头
        tokenizer.setNames("id", "name", "age", "sex", "birthday");

        // 将每行数据转换为TestData对象
        DefaultLineMapper<Student> mapper = new DefaultLineMapper<>();

        // 设置LineTokenizer
        mapper.setLineTokenizer(tokenizer);

        // 设置映射方式,即读取到的文本怎么转换为对应实体
        mapper.setFieldSetMapper(fieldSet -> {
            Student student = new Student();
            student.setId(fieldSet.readLong("id"));
            student.setName(fieldSet.readString("name"));
            student.setAge(fieldSet.readInt("age"));
            student.setSex(fieldSet.readInt("sex"));
            student.setBirthday(fieldSet.readDate("birthday"));
            return student;
        });
        reader.setLineMapper(mapper);
        return reader;
    }
}

②.数据处理

实现数据处理器,需要实现 ItemProcessor接口。

  • StudentProcessor:学生信息处理器,用于对读取到的数据进行处理,案例中我们进行了简单的数据过滤处理。
@Component
public class StudentProcessor implements ItemProcessor<Student, Student> {
    @Override
    public Student process(Student student) throws Exception {
        // 将性别为1的都设置为0
        if (student.getSex() == 1) {
            student.setSex(0);
        }
        return student;
    }
}

③.数据写入

实现数据写入器,需要实现ItemWriter接口,案例中使用集成的mybatisplus 批量到mysql中。

@Component
public class StudentItemWriter implements ItemWriter<Student> {
    @Resource
    private StudentMapper mapper;
    @Override
    public void write(List<? extends Student> list) throws Exception {
        mapper.batchInsert(list);
    }
}

5.配置作业

配置作业即定义Job 和对应的 Step,定义批处理任务的执行流程。

①.作业监听器

定义作业监听器,我们可以通过它监听作业执行开始和执行结束,并打印作业执行耗时。

  • StudentReaderJobListener:作业监听器
@Slf4j
@Component
public class StudentReaderJobListener implements JobExecutionListener {
    private LocalDateTime start;
    private LocalDateTime end;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        start = LocalDateTime.now();
        log.info("student reader job start...");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        end = LocalDateTime.now();
        log.info("student reader job end,cost {} ms", ChronoUnit.MILLIS.between(start, end));

    }
}

②.定义作业

定义作业任务,设置其作业名称、监听器、作业步骤(由数据读取器、数据处理器、数据写入器组成)。

  • StudentReaderJobBatchConfig:作业任务配置
@Slf4j
@Configuration
public class StudentReaderJobBatchConfig {
    // 任务构建工厂
    @Resource
    private JobBuilderFactory jobFactory;
    // 步骤构建工厂
    @Resource
    private StepBuilderFactory stepFactory;
    // 读取构造器
    @Resource
    private StudentFileItemReaderBuilder readerBuilder;
    // 信息处理器
    @Resource
    private StudentProcessor studentProcessor;
    // 信息写入器
    @Resource
    private StudentItemWriter studentWriter;
    // 任务监听器
    @Resource
    private StudentReaderJobListener jobListener;

    @Bean
    public Job studentReaderJob() {
        return jobFactory.get("studentReaderJob")
                .incrementer(new RunIdIncrementer())
                .listener(jobListener)
                .start(studentReaderJobStep())
                .build();
    }

  //任务步骤
    private Step studentReaderJobStep() {
        return stepFactory.get("studentReaderJobStep")
                .<Student, Student>chunk(10)
                .reader(readerBuilder.build())
                .processor(studentProcessor)
                .writer(studentWriter)
                .build();
    }
}

四、代码测试

1. 测试代码

@SpringBootTest
public class ApiTest {
	  // 作业启动器
    @Resource
    public JobLauncher jobLauncher;
		// 作业任务
    @Resource
    private Job studentReaderJob;

    @Test
    public void test_studentJob_execute() throws Exception {
        // 启动参数,可按需设置
        JobParameters jobParameters = new JobParametersBuilder()
                //.addString("time", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();

        jobLauncher.run(studentReaderJob, jobParameters);
    }
}

2.测试结果

  • 后台日志
  • 数据入库结果
  • 批处理执行记录表,集成组件后会在业务库中生成并记录作业信息。

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: