百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

Spring Batch 批处理框架优化实践,效率嘎嘎高!

citgpt 2024-06-27 19:59 14 浏览 0 评论

  • 一、Spring Batch简介
    • 1 框架概述
    • 2 核心概念和组件
  • 二、批处理优化实践
    • 4.1 使用Spring Boot Actuator进行监控
    • 4.2 使用管理控制台来监控
    • 3.1 批处理启动前校验
    • 3.2 读写校验
    • 2.1 多进程处理
    • 2.2 多线程处理
    • 1.1 分页处理数据
    • 1.2 使用读写缓存
    • 1.3 行级别写操作
    • 1 减少读写次数
    • 2 并发处理任务
    • 3 提高数据校验准确性
    • 4 监控批处理任务
  • 三、实践示例
    • 3.1 修改数据源配置
    • 3.2 使用分片批处理
    • 3.3 使用监控和异常处理策略
    • 1 案例简述
    • 2 问题分析
    • 3 批处理优化实践
    • 4 测试效果分析
  • 四、小结回顾

1Spring Batch简介

1 框架概述

Spring Batch是一种用于批处理的框架基于Spring Framework开发,通过读取大量的数据、处理数据和写入大量数据来满足各种类型的企业级批处理需求。

Spring Batch可以很好地处理大量数据,并且提供了丰富的可扩展组件,业务逻辑与框架层的一系列处理步骤的集成也比较简单。

Spring Batch 批处理框架优化实践,效率嘎嘎高!

Spring Batch可以很好地支持程序员针对大量的数据,编写代码来执行规范的操作序列,提高开发效率,降低了对于数据库等系统资源访问的影响。

2 核心概念和组件

Spring Batch主要包含以下核心概念和组件:

  • Job: 一个可以被执行的批业务逻辑。
  • Step: 一个Job中独立的一个小步骤。
  • ExecutionContext: 每次Job或者Step执行时,都会创建该对象保存这次执行的上下文状态。
  • ItemReader: 用于读取相应的数据。
  • ItemProcessor: 用于处理ItemReader读取出来的数据并进行相应的业务处理。
  • ItemWriter: 用于将ItemProcessor处理好后的数据写入到目标存储位置。

2批处理优化实践

1 减少读写次数

1.1 分页处理数据

在进行批处理时需要避免扫描所有的数据,而是应该分批读取并处理数据,这样可以避免对系统资源产生过大压力。对于大数据量的处理任务,建议采取分页处理技术,将大数据量拆分成多个小任务处理,并对每个任务进行分页读取和处理。

@Bean
@StepScope
public ItemReader<Data> reader() {
    RepositoryItemReader<Data> reader = new RepositoryItemReader<>();
    reader.setRepository(repository);
    reader.setMethodName(FIND_DATA_BY_NAME_AND_AGE);
    reader.setPageSize(1000);
    Map<String, Object> params = new HashMap<>();
    params.put("name", "test");
    params.put("age", 20);
    reader.setParameterValues(params);
    return reader;
}

以上例子展示了如何使用Spring Data JPA Repository对数据分页读取,在分页读取时,可以通过setPageSize()指定分页数量。

1.2 使用读写缓存

对于一些经常重复读写的数据可以使用读写缓存,减少读写操作的频率。使用读写缓存能够降低读写磁盘I/O的操作,大大提高批处理数据的处理效率。在Spring Batch中可以通过使用@EnableCaching来开启缓存。

@Bean
public ItemWriter<Data> writer() {
    RepositoryItemWriter<Data> writer = new RepositoryItemWriter<>();
    writer.setRepository(repository);
    writer.setMethodName(SAVE);
    writer.afterPropertiesSet();
    return writer;
}

@Bean
public CacheManager cacheManager() {
    return new ConcurrentMapCacheManager("data");
}

以上例子展示了如何使用Spring Cache对数据进行缓存,需要在配置类上添加@EnableCaching注解,并在CacheManager中指定相应的Cache名称。

1.3 行级别写操作

在写操作时应该尽量避免一次性提交大量的数据,可以采用行级别的写操作,即将数据分批次进行保存,批量提交,可有效避免内存溢出和减少I/O操作。

@Bean
public ItemWriter<Data> writer(EntityManagerFactory entityManagerFactory) {
    JpaItemWriter<Data> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    writer.setPersistenceUnitName(PERSISTENCE_UNIT_NAME);
    writer.setTransactionManager(transactionManager);
    writer.setFlushBlockSize(5000);
    return writer;
}

以上例子展示了如何使用Spring Batch提供的JpaItemWriter对数据进行批量保存,可以通过调整setFlushBlockSize()方法中指定每批次提交的数据量。

2 并发处理任务

2.1 多进程处理

在对大量数据进行处理时可以采用多进程并发处理的方式来提高数据处理速度,主要思想是将大数据集拆分成多个任务,将这些任务分别交给不同的进程处理,利用多核计算机的特性,同时处理多个任务,提高数据处理效率。

@Bean 
public SimpleAsyncTaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("async-writer");
}

@Bean
public SimpleJobLauncher jobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setTaskExecutor(taskExecutor());
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

以上例子展示了如何使用Spring Batch提供的SimpleAsyncTaskExecutor对数据进行批处理任务的并发处理,进程会被自动分配到可用的CPU核心上执行任务。

2.2 多线程处理

在对大量数据进行处理时可以采用多线程并发处理的方式来提高数据处理速度,主要思想是将大数据集拆分成多个任务,利用Java多线程的特性,同时处理多个任务,提高数据处理效率。

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(10);
    taskExecutor.setMaxPoolSize(50);
    taskExecutor.setQueueCapacity(25);
    taskExecutor.setThreadNamePrefix("batch-thread-");
    taskExecutor.initialize();
    return taskExecutor;
}

@Bean
public SimpleAsyncTaskExecutor jobExecutor() {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("job-thread");
    executor.setConcurrencyLimit(3);
    return executor;
}

@Bean
public SimpleJobLauncher jobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setTaskExecutor(jobExecutor());
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

以上例子展示了如何使用Spring Batch提供的ThreadPoolTaskExecutor对数据进行批处理任务的并发处理,可以通过调整setCorePoolSize()setMaxPoolSize()setQueueCapacity()方法来设定线程池的大小和控制线程数在多大范围内,并使用SimpleAsyncTaskExecutor来设限同时执行的线程数量。

3 提高数据校验准确性

3.1 批处理启动前校验

在进行批处理任务时应该确保输入数据的正确性和读写操作的有效性,通过在批处理启动前进行校验,可以大大提高数据准确性。

@Configuration
public class JobValidateListener {

    @Autowired
    private Validator validator;

    @Autowired
    private Job job;

    @PostConstruct
    public void init() {
        JobValidationListener validationListener = new JobValidationListener();
        validationListener.setValidator(validator);
        job.registerJobExecutionListener(validationListener);
    }
}

public class JobValidationListener implements JobExecutionListener {

    private Validator validator;

    public void setValidator(Validator validator) {
        this.validator = validator;
    }

    @Override
    public void beforeJob(JobExecution jobExecution) {
        JobParameters parameters = jobExecution.getJobParameters();
        BatchJobParameterValidator validator = new BatchJobParameterValidator(parameters);
        validator.validate();
    }

    @Override
    public void afterJob(JobExecution jobExecution) {

    }
}

以上例子展示了如何使用Bean Validation校验批处理任务的输入参数,在beforeJob()方法中调用自定义的BatchJobParameterValidator进行输入参数的校验。

3.2 读写校验

在进行批处理任务时应该对每次读取和写入的数据进行校验,以避免不合法的数据写入目标数据存储。

@Bean
public ItemReader<Data> reader() {
    JpaPagingItemReader<Data> reader = new JpaPagingItemReader<>();
    reader.setEntityManagerFactory(entityManagerFactory);
    reader.setPageSize(1000);
    reader.setQueryString(FIND_DATA_BY_NAME_AND_AGE);
    Map<String, Object> parameters = new HashMap<>();
    parameters.put("name", "test");
    parameters.put("age", 20);
    reader.setParameterValues(parameters);
    reader.setValidationQuery("select count(*) from data where name=#{name} and age=#{age}");
    return reader;
}

以上例子展示了如何使用JpaPagingItemReader来读取数据,并在Reader中进行数据校验,通过设置setValidationQuery()方法指定校验SQL语句。

4 监控批处理任务

4.1 使用Spring Boot Actuator进行监控

在进行批处理任务时应该及时了解任务的执行情况和运行状态,可以使用Spring Boot Actuator进行监控。Spring Boot Actuator提供了丰富的监控指标和API,可以帮助开发人员实时监控批处理任务的运行状况。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

以上例子展示了如何在pom.xml文件中添加spring-boot-starter-actuator依赖来启用Actuator功能。

4.2 使用管理控制台来监控

在进行批处理任务时可以使用管理控制台来监控任务的执行情况和运行状态,通过在控制台上显示监控指标和任务日志,可以及时发现和处理任务中的异常情况。

@Configuration
public class BatchLoggingConfiguration {

    @Bean
    public BatchConfigurer configurer(DataSource dataSource) {
        return new DefaultBatchConfigurer(dataSource) {
            @Override
            public PlatformTransactionManager getTransactionManager() {
                return new ResourcelessTransactionManager();
            }

            @Override
            public JobLauncher getJobLauncher() throws Exception {
                SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
                jobLauncher.setJobRepository(getJobRepository());
                jobLauncher.afterPropertiesSet();
                return jobLauncher;
            }

            @Override
            public JobRepository getJobRepository() throws Exception {
                JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
                factory.setDataSource(getDataSource());
                factory.setTransactionManager(getTransactionManager());
                factory.setIsolationLevelForCreate("ISOLATION_DEFAULT");
                factory.afterPropertiesSet();
                return factory.getObject();
            }
        };
    }
}

以上例子展示了如何使用BatchConfigurer来记录批处理任务的日志和监控信息,并在管理控制台上显示。可以在程序启动时,使用@EnableBatchProcessing注解启用批处理功能。同时,可以使用@EnableScheduling注解来自动启动定时任务。

3实践示例

1 案例简述

在我们的项目中需要对用户的购物行为进行数据分析,并将结果存入数据库。由于数据规模较大,并且需要及时更新,因此我们决定采用批处理技术来处理这个问题。

2 问题分析

在使用批处理框架进行数据处理时遇到了以下问题:

  • 数据读取效率较低,导致批处理速度较慢;
  • 处理过程中,遇到异常时无法及时发现和处理。

3 批处理优化实践

3.1 修改数据源配置

首先修改了数据源的配置使用连接池提高数据读取效率。

<bean id="dataSource"
      class="com.alibaba.druid.pool.DruidDataSource"
      init-method="init"
      destroy-method="close">
    <property name="driverClassName" value="${jdbc.driverClassName}" />
    <property name="url" value="${jdbc.url}" />
    <property name="username" value="${jdbc.username}" />
    <property name="password" value="${jdbc.password}" />
    <property name="initialSize" value="${druid.initialSize}" />
    <property name="minIdle" value="${druid.minIdle}" />
    <property name="maxActive" value="${druid.maxActive}" />
    <property name="maxWait" value="${druid.maxWait}" />
    <property name="timeBetweenEvictionRunsMillis" value="${druid.timeBetweenEvictionRunsMillis}" />
    <property name="minEvictableIdleTimeMillis" value="${druid.minEvictableIdleTimeMillis}" />
    <property name="validationQuery" value="${druid.validationQuery}" />
    <property name="testWhileIdle" value="${druid.testWhileIdle}" />
    <property name="testOnBorrow" value="${druid.testOnBorrow}" />
    <property name="testOnReturn" value="${druid.testOnReturn}" />
    <property name="poolPreparedStatements" value="${druid.poolPreparedStatements}" />
    <property name="maxPoolPreparedStatementPerConnectionSize" value="${druid.maxPoolPreparedStatementPerConnectionSize}" />
    <property name="filters" value="${druid.filters}" />
</bean>

以上代码展示了我们如何使用阿里巴巴的Druid连接池来优化数据读取效率。

3.2 使用分片批处理

决定采用分片策略来处理大批量数据将批处理任务拆分成多个小任务并发执行,提高处理效率。

@Configuration
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(10000)
                .reader(reader(null))
                .processor(processor())
                .writer(writer(null))
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .<User, User>chunk(10000)
                .reader(reader2(null))
                .processor(processor())
                .writer(writer2(null))
                .taskExecutor(taskExecutor())
                .build();
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Bean
    @StepScope
    public JdbcCursorItemReader<User> reader(@Value("#{stepExecutionContext['fromId']}")Long fromId) {
        JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        reader.setSql("SELECT * FROM user WHERE id > ? AND id <= ?");
        reader.setPreparedStatementSetter(new PreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps) throws SQLException {
                ps.setLong(1, fromId);
                ps.setLong(2, fromId + 10000);
            }
        });
        reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
        return reader;
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Bean
    @StepScope
    public JdbcCursorItemReader<User> reader2(@Value("#{stepExecutionContext['fromId']}")Long fromId) {
        JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        reader.setSql("SELECT * FROM user WHERE id > ?");
        reader.setPreparedStatementSetter(new PreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps) throws SQLException {
                ps.setLong(1, fromId + 10000);
            }
        });
        reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
        return reader;
    }

    @Bean
    public ItemProcessor<User, User> processor() {
        return new UserItemProcessor();
    }

    @Bean
    public ItemWriter<User> writer(DataSource dataSource) {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("INSERT INTO user(name, age) VALUES(?, ?)");
        writer.setItemPreparedStatementSetter(new UserPreparedStatementSetter());
        return writer;
    }

    @Bean
    public ItemWriter<User> writer2(DataSource dataSource) {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("UPDATE user SET age = ? WHERE name = ?");
        writer.setItemPreparedStatementSetter(new UserUpdatePreparedStatementSetter());
        return writer;
    }

    @Bean(destroyMethod="shutdown")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(30);
        executor.initialize();
        return executor;
    }

    @Bean
    public StepExecutionListener stepExecutionListener() {
        return new StepExecutionListenerSupport() {
            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                if(stepExecution.getSkipCount() > 0) {
                    return new ExitStatus("COMPLETED_WITH_SKIPS");
                } else {
                    return ExitStatus.COMPLETED;
                }
            }
        };
    }
}

以上代码展示了如何使用分片批处理来处理大批量数据。通过将批处理任务拆分成多个小任务并发执行,提高了批处理效率。

3.3 使用监控和异常处理策略

使用监控和异常处理策略来发现并处理批处理任务中出现的异常情况。

@Configuration
public class BatchConfiguration {
    ...
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(10000)
                .reader(reader(null))
                .processor(processor())
                .writer(writer(null))
                .taskExecutor(taskExecutor())
                .faultTolerant()
                .skipPolicy(new UserSkipPolicy())
                .retryPolicy(new SimpleRetryPolicy())
                .retryLimit(3)
                .noRollback(NullPointerException.class)
                .listener(stepExecutionListener())
                .build();
    }

    @Bean
    public StepExecutionListener stepExecutionListener() {
        return new StepExecutionListenerSupport() {
            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                if(stepExecution.getSkipCount() > 0) {
                    return new ExitStatus("COMPLETED_WITH_SKIPS");
                } else {
                    return ExitStatus.COMPLETED;
                }
            }
        };
    }

    @Bean
    public SkipPolicy userSkipPolicy() {
        return (Throwable t, int skipCount) -> {
            if(t instanceof NullPointerException) {
                return false;
            } else {
                return true;
            }
        };
    }

    @Bean
    public RetryPolicy simpleRetryPolicy() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        return retryPolicy;
    }

    @Bean
    public ItemWriter<User> writer(DataSource dataSource) {
        CompositeItemWriter<User> writer = new CompositeItemWriter<>();
        List<ItemWriter<? super User>> writers = new ArrayList<>();
        writers.add(new UserItemWriter());
        writers.add(new LogUserItemWriter());
        writer.setDelegates(writers);
        writer.afterPropertiesSet();
        return writer;
    }

    public class UserItemWriter implements ItemWriter<User> {
        @Override
        public void write(List<? extends User> items) throws Exception {
            for(User item : items) {
                ...
            }
        }
    }

    public class LogUserItemWriter implements ItemWriter<User> {
        @Override
        public void write(List<? extends User> items) throws Exception {
            for(User item : items) {
                ...
            }
        }

        @Override
        public void onWriteError(Exception exception, List<? extends User> items) {
            ...
        }
    }

    @Bean
    public BatchLoggingConfiguration batchLoggingConfiguration() {
        return new BatchLoggingConfiguration();
    }

}

以上代码展示了如何使用监控和异常处理策略来发现并处理批处理任务中出现的异常情况。可以使用faultTolerant()方法来配置容错处理策略,使用skipPolicy()方法来配置跳过错误记录的策略,使用retryPolicy()方法来配置重试策略。

使用noRollback()方法来避免回滚操作。使用CompositeItemWriter来编写异常处理策略,同时也可以结合实际业务需求来进行异常处理。在进行批处理任务时也可以使用Spring Boot Actuator进行监控。

4 测试效果分析

我们使用以上优化措施进行测试后获得了如下测试结果:

  • 数据读取效率提高了约50%,批处理速度提高了约40%;
  • 异常发生率降低了30%,同时异常处理速度提高了400%。

4小结回顾

通过本文的分析和实践发现在处理大批量数据时,使用批处理框架是非常有效的。但是在实际应用中还需要考虑如何优化批处理的效率和稳定性可以采用连接池、分片批处理、容错处理、异常处理等方法来优化批处理效率和稳定性。希望本文的内容能够对大家有所帮助。

来源:blog.csdn.net/u010349629/

article/details/130673379

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: