百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

SpringBoot项目中应用Spring Batch批处理框架,处理大数据新方案

citgpt 2024-06-27 19:58 8 浏览 0 评论

环境:Springboot2.3.12RELEASE + Spring Batch4.2.7


Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

SpringBoot项目中应用Spring Batch批处理框架,处理大数据新方案

业务场景:

  1. 定期提交批处理。
  2. 并行批处理:作业的并行处理
  3. 分阶段、企业消息驱动的处理
  4. 大规模并行批处理
  5. 故障后手动或计划重新启动
  6. 相关步骤的顺序处理(扩展到工作流驱动的批处理)
  7. 部分处理:跳过记录(例如,回滚时)
  8. 整批事务,适用于小批量或现有存储过程/脚本的情况

技术目标:

  1. 批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。
  2. 基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。
  3. 提供通用的核心执行服务,作为所有项目都可以实现的接口。
  4. 提供可“开箱即用”的核心执行接口的简单和默认实现。
  5. 通过在所有层中利用spring框架,可以轻松配置、定制和扩展服务。
  6. 所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。
  7. 提供一个简单的部署模型,使用Maven构建的架构JAR与应用程序完全分离。

Spring Batch的结构:

此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用SpringBatch编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。

下面介绍开发流程

本例完成 读取文件内容,经过处理后,将数据保存到数据库中

  • 引入依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
  <groupId>org.hibernate</groupId>
  <artifactId>hibernate-validator</artifactId>
  <version>6.0.7.Final</version>
</dependency>
  • 应用配置文件
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8
    username: root
    password: *******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  batch:
    job:
      enabled: false #是否自动执行任务
    initialize-schema: always  #自动为我们创建数据库脚本
  • 开启批处理功能
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer{
}
  • 任务启动器

接着上一步的配置类BatchConfig重写对应方法

@Override
protected JobLauncher createJobLauncher() throws Exception {
  SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
  jobLauncher.setJobRepository(createJobRepository());
  jobLauncher.afterPropertiesSet();
  return jobLauncher;
}
  • 任务存储

接着上一步的配置类BatchConfig重写对应方法

@Resource
private PlatformTransactionManager transactionManager ;
@Override
protected JobRepository createJobRepository() throws Exception {
  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
  factory.setDatabaseType("mysql");
  factory.setTransactionManager(transactionManager);
  factory.setDataSource(dataSource);
  factory.afterPropertiesSet();
  return factory.getObject();
}
  • 定义JOB
@Bean
public Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){
  return builder.get("myJob")
             .incrementer(new RunIdIncrementer())
             .flow(step)
             .end()
             .listener(jobExecutionListener)
             .build();
}
  • 定义ItemReader读取器
@Bean
public ItemReader<Person> reader(){
  FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
  reader.setResource(new ClassPathResource("cvs/persons.cvs"));
  reader.setLineMapper(new DefaultLineMapper<Person>() {
    // 代码块
    {
      setLineTokenizer(new DelimitedLineTokenizer(",") {
        {
          setNames("id", "name");
        }
    }) ;
  setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
    {
      setTargetType(Person.class) ;
    }
    });
  }
  });
  return reader;
}
  • 定义ItemProcessor处理器
@Bean
public ItemProcessor<Person, Person2> processorPerson(){
  return new ItemProcessor<Person, Person2>() {
      @Override
      public Person2 process(Person item) throws Exception {
        Person2 p = new Person2() ;
        p.setId(item.getId()) ;
        p.setName(item.getName() + ", pk");
        return p ;
      }
  } ;
}
  • 定义ItemWriter写数据
@Resource
private Validator<Person> validator ;
@Resource
private EntityManagerFactory entityManagerFactory ;
@Bean
public ItemWriter<Person2> writerPerson(){
  JpaItemWriter<Person2> writer = null ;
  JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ;
  builder.entityManagerFactory(entityManagerFactory) ;
  writer = builder.build() ;
  return writer;
}
  • 定义Step
@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){
  return stepBuilderFactory
             .get("myStep")
             .<Person, Person>chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)
             .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
             .listener(new MyReadListener())
             .processor(processor)
             .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
             .listener(new MyWriteListener())
             .build();
}
  • 定义相应的监听器
public class MyReadListener implements ItemReadListener<Person> {

	private Logger logger = LoggerFactory.getLogger(MyReadListener.class);

	@Override
	public void beforeRead() {
	}

	@Override
	public void afterRead(Person item) {
		System.out.println("reader after: " + Thread.currentThread().getName()) ;
	}

	@Override
	public void onReadError(Exception ex) {
		logger.info("读取数据错误:{}", ex);
	}
}
@Component
public class MyWriteListener implements ItemWriteListener<Person> {
	
    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
 
    @Override
    public void beforeWrite(List<? extends Person> items) {
    }
    
    @Override
    public void afterWrite(List<? extends Person> items) {
    	System.out.println("writer after: " + Thread.currentThread().getName()) ;
    }
    
    @Override
    public void onWriteError(Exception exception, List<? extends Person> items) {
        try {
            logger.info(format("%s%n", exception.getMessage()));
            for (Person item : items) {
                logger.info(format("Failed writing BlogInfo : %s", item.toString()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

person.cvs文件内容

实体类:

@Entity
@Table(name = "t_person")
public class Person {
	
	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private Integer id ;
	private String name ;
}

启动任务执行

@RestController
@RequestMapping("/demo")
public class DemoController {
  @Resource
	@Qualifier("myJob")
	private Job job ;
  @Resource
	private JobLauncher launcher ;
  @GetMapping("/index")
	public Object index() {
		JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ;
		try {
			launcher.run(job, jobParameters) ;
		} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
				| JobParametersInvalidException e) {
			e.printStackTrace();
		}
		return "success" ;
	}
}

启动服务,自动为我们创建了表

执行任务

查看表情况


完毕!!!

公众:Springboot实战案例锦集

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: