百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

全网最详细SpringBatch读取分区文件讲解

citgpt 2024-06-27 19:58 9 浏览 0 评论


写在前面: 我是「境里婆娑」。我还是从前那个少年,没有一丝丝改变,时间只不过是考验,种在心中信念丝毫未减,眼前这个少年,还是最初那张脸,面前再多艰险不退却。

全网最详细SpringBatch读取分区文件讲解

写博客的目的就是分享给大家一起学习交流,如果您对 Java感兴趣,可以关注我,我们一起学习。

前言:为什么要写这篇文章,在网上很难找到一篇关于SpringBatch批处理读取分区文件基于JavaBean配置的文章,因此我决定写一篇关于SpringBatch读取分区文件基于javaBean配置的文章,希望这篇文章可以帮助新手的你或者你有一定经验的可以加深印象。

一、分区Step

何为分区Step:

通过将任务进行分区,不同的Step处理不同任务数据达到提高Job效率功能。

分区作业可以分区两个处理阶段,数据分区、分区处理;

1、数据分区

数据分区:根据特殊的规则,将数据进行合理分片,为不同的数据切片生成数据执行上下文Execution Context、作业执行器Step Execution。可以通过接口Partitioner生成自定义分区逻辑,SpringBatch批处理框架默认对多文件实现MultiResourcePartititoner;也可以自行扩展接口Partitioner实现自定义分区逻辑。

2、分区处理

分区处理:通过数据分区后,不同的数据已经被分配到不同的作业执行器中,接下来需要交给分区处理器进行作业,分区处理器可以在本地或远程执行被划分的作业。接口PartitionHandler定义了分区处理逻辑,SpringBatch批处理框架默认实现了本地分区处理TaskExecutorPartitionHandler;也可以自行扩展接口PartitionHandler来实现自定义分区逻辑。

分区作业逻辑结构图:



二、实现分区关键接口

实现分区关键接口有如下:PartitionHandler、StepExecutionSplitter、Partitioner。

1、Partitioner

Partitoner接口定义了如何根据给定的分区规则进行创建作业执行分区的上下文。

Partitioner接口定义如下:

public interface Partitioner {
	Map<String, ExecutionContext> partition(int gridSize);
}


gridSize含义:根据给定的gridSize大小进行执行上下文划分。

2、StepExecutionSplitter

StepExecutionSplitter接口定义了如何根据给定的分区规则进行创建作业执行分区的执行器。

StepExecutionSplitter接口定义如下

public interface StepExecutionSplitter {
	String getStepName();
	Set<StepExecution> split(StepExecution stepExecution, int 	gridSize) throws JobExecutionException;
}


getStepName:获取当前定义的分区作业的名称。

split:根据给定的分区规则为每个分区生成对应的分区执行器。

3、PartitionHandler

PartitionHandler接口定义了分区处理的逻辑,根据给定的StepExecutionSplitter进行分区并执行,最后将执行的结果进行收集,反馈给前端。

PartitionHandler接口定义如下

public interface PartitionHandler {
	Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception;
}

三、基本配置和属性说明

上面两节基本知识已经介绍完毕,下面我们将讲一个例子来巩固之前知识。

1、基本配置

一个典型分区Job配置

 @Bean
    public Step partitionMasterMultiFileStep() {
        return stepBuilderFactory.get("partitionMasterMultiFileStep")
                .partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner())
                .partitionHandler(multiFilePartitionHandler())
                .build();
    }

2、属性说明

在配置分区Step之前,我们先看下分区Step的主要属性定义和元素定义



四、文件分区

SpringBatch框架提供了对文件分区的支持,实现类:MultiResourcePartitioner提供了对文件分区的默认支持,根据文件名将不同文件处理进行分区,提升处理速度和效率。本文将按照此例子给出如何配置多文件分区实现。

读取文件如下:



本节实例由于文件多,我们对文件进行分区,然后将文件的内容写入DB,逻辑示意图如下:



1、定义分区文件Partitioner

定义文件分区,将不同的文件分配到不同的作业中,使用自定义MyMultiResourcePartitioner分区。

自定义分区MyMultiResourcePartitioner如下:

/**
 * @author shuliangzhao
 * @date 2020/12/4 23:14
 */
public class MyMultiResourcePartitioner implements Partitioner {
    private static final String DEFAULT_KEY_NAME = "fileName";
    private static final String PARTITION_KEY = "partition";
    private Resource[] resources = new Resource[0];
    private String keyName = DEFAULT_KEY_NAME;
    public void setResources(Resource[] resources) {
        this.resources = resources;
    }

    public void setKeyName(String keyName) {
        this.keyName = keyName;
    }
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);
        int i = 0;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            Assert.state(resource.exists(), "Resource does not exist: "+resource);
            try {
                context.putString(keyName, resource.getURI().getPath());
            }
            catch (IOException e) {
                throw new IllegalArgumentException("File could not be located for: "+resource, e);
            }
            map.put(PARTITION_KEY + i, context);
            i++;
        }
        return map;
    }
}


属性keyName:用于指定作业上文中属性名字,作用是在不同的作业上下文中可以获取设置的对于属性值。可以在读写阶段通过@Value("#{stepExecutionContext[fileName]}"方式获取。

2、定义文件读

配置好分区实现,需要在每个分区作业中读入不同文件,进而提供文件处理效率。

PartitionMultiFileReader 实现

public class PartitionMultiFileReader extends FlatFileItemReader {
    public PartitionMultiFileReader(Class clz,String fileName) {
        setResource(new FileSystemResource(fileName.substring(1)));
        Field[] declaredFields = clz.getDeclaredFields();
        List<String> list = new ArrayList<>();
        for (Field field:declaredFields) {
            list.add(field.getName());
        }
        String[] names = new String[list.size()];
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setDelimiter(",");
        delimitedLineTokenizer.setNames(list.toArray(names));
        DefaultLineMapper defaultLineMapper = new DefaultLineMapper();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        CommonFieldSetMapper commonFieldSetMapper = new CommonFieldSetMapper();
        commonFieldSetMapper.setTargetType(clz);
        defaultLineMapper.setFieldSetMapper(commonFieldSetMapper);
        setLineMapper(defaultLineMapper);
        setName(clz.getSimpleName());
    }
}

3、定义分区job配置

基于javabean方式实现job配置


package com.sl.config;
//包导入省略
/**
 * @author shuliangzhao
 * @Title: PartitionFileConfiguration
 * @ProjectName spring-boot-learn
 * @Description: TODO
 * @date 2020/12/4 21:09
 */
@Configuration
@EnableBatchProcessing
public class PartitionMultiFileConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private PartitonMultiFileProcessor partitonMultiFileProcessor;
    @Autowired
    private PartitionMultiFileWriter partitionMultiFileWriter;
    @Bean
    public Job partitionMultiFileJob() {
         return jobBuilderFactory.get("partitionMultiFileJob")
                 .start(partitionMasterMultiFileStep())
                 .build();
    }
    @Bean
    public Step partitionMasterMultiFileStep() {
        return stepBuilderFactory.get("partitionMasterMultiFileStep")
                .partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner())
                .partitionHandler(multiFilePartitionHandler())
                .build();
    }
    @Bean
    public PartitionHandler multiFilePartitionHandler() {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setGridSize(2);
        handler.setStep(partitionSlaveMultiFileStep());
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return handler;
    }
    @Bean
    public Step partitionSlaveMultiFileStep() {
        return stepBuilderFactory.get("partitionSlaveMultiFileStep")
                .<CreditBill,CreditBill>chunk(1)
                .reader(partitionMultiFileReader(null))
                .processor(partitonMultiFileProcessor)
                .writer(partitionMultiFileWriter)
                .build();
    }
    @Bean
    @StepScope
    public PartitionMultiFileReader partitionMultiFileReader(@Value("#{stepExecutionContext[fileName]}")String fileName) {
        return new PartitionMultiFileReader(CreditBill.class,fileName);
    }
    @Bean
    public MyMultiResourcePartitioner multiResourcePartitioner() {
        MyMultiResourcePartitioner multiResourcePartitioner = new MyMultiResourcePartitioner();
         multiResourcePartitioner.setKeyName("fileName");
         multiResourcePartitioner.setResources(getResource());
         return multiResourcePartitioner;
    }
    private Resource[] getResource() {
        String filePath = "D:\\aplus\\bill\\";
        File file = new File(filePath);
        List<Resource> resourceList = new ArrayList<>();
        if (file.isDirectory()) {
            String[] list = file.list();
            if (list != null) {
                for (String str : list) {
                    String resource = file.getPath() + "\\" + str;
                    FileSystemResource fileSystemResource = new FileSystemResource(resource);
                    resourceList.add(fileSystemResource);
                }
            }
        }
        Resource[] resources = new Resource[resourceList.size()];
        return resourceList.toArray(resources);
    }
}


4、定义processor

定义processor

/**
 * @author shuliangzhao
 * @date 2020/12/4 22:11
 */
@Component
@StepScope
public class PartitonMultiFileProcessor implements ItemProcessor<CreditBill,CreditBill> {
    @Override
    public CreditBill process(CreditBill item) throws Exception {
        CreditBill creditBill = new CreditBill();
        creditBill.setAcctid(item.getAcctid());
        creditBill.setAddress(item.getAddress());
        creditBill.setAmout(item.getAmout());
        creditBill.setDate(item.getDate());
        creditBill.setName(item.getName());
        return creditBill;
    }
}

4、定义writer

/**
 * @author shuliangzhao
 * @date 2020/12/4 22:29
 */
@Component
@StepScope
public class PartitionMultiFileWriter implements ItemWriter<CreditBill> {
    @Autowired
    private CreditBillMapper creditBillMapper;
    @Override
    public void write(List<? extends CreditBill> items) throws Exception {
        if (items != null && items.size() > 0) {
            items.stream().forEach(item -> {
                creditBillMapper.insert(item);
            });
        }
    }
}

4、定义step监听器

定义step监听器目的是在处理作业之前打印线程名字和读取文件名字

@Component
public class PartitionStepListener implements StepExecutionListener {
    private static final Logger logger = LoggerFactory.getLogger(PartitionStepListener.class);
    @Override
    public void beforeStep(StepExecution stepExecution) {
       logger.info("ThreadName={},steName={},FileName={}",Thread.currentThread().getName(),
               stepExecution.getStepName(),stepExecution.getExecutionContext().getString("fileName"));
    }
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

6、运行job

执行job查看结果,可以看出不同的文件有不同的线程来处理,并且被分配到不同的分区作业步中执行
2020-12-05 15:58:34.100  INFO 13208 --- [cTaskExecutor-1] com.sl.listener.PartitionStepListener    : ThreadName=SimpleAsyncTaskExecutor-1,steName=partitionSlaveMultiFileStep:partition1,FileName=/D:/aplus/bill/bill2.csv
2020-12-05 15:58:34.114  INFO 13208 --- [cTaskExecutor-3] com.sl.listener.PartitionStepListener    : ThreadName=SimpleAsyncTaskExecutor-3,steName=partitionSlaveMultiFileStep:partition0,FileName=/D:/aplus/bill/bill1.csv
2020-12-05 15:58:34.122  INFO 13208 --- [cTaskExecutor-2] com.sl.listener.PartitionStepListener    : ThreadName=SimpleAsyncTaskExecutor-2,steName=partitionSlaveMultiFileStep:partition2,FileName=/D:/aplus/bill/bill3.csv


至此,我们完成了对文件分区的处理。

如果向更详细查看以上所有代码请移步到github:[文件分区详细代码](https://github.com/FadeHub/spring-boot-learn/blob/master/spring-boot-springbatch/src/main/java/com/sl/config/PartitionMultiFileConfiguration.java)

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: