Spring batch是spring提供的批处理的框架,但不包括日程管理(schedule)的部分,最近项目用到,初试牛刀。
Spring Batch的核心概念
如下图,JobLancher启动job,一个job包含若干step,每个step又包含一个ItemReader(读数据),ItemProcessor(处理数据),和ItemWriter(输出数据),job的元数据和运行状态则存储在JobRepository中。
Job运行时概念
Job的一次完整运行称为一个JobInstance,由JobParameter区分(Spring认为相同的Job不应该多次运行),即如果JobParameter相同则为同一个Job,而一次运行如果中途失败或者抛异常,再次运行仍为一个JobInstance,而其中的每次运行称为一个JobExecution。
执行一个step称为StepExecution
关系如下:
Job 1->n JobInstance 1->n JobExecution 1->n StepExecution
JobExecution和StepExecution各包含一个ExecutionContext,其中存储了key-value对,可以用来存储运行状态。
实例
看spring自带的一个例子吧,billing,先看配置(有人说,spring是在用xml写代码,谁说不是呢?!并且xml没有语法检查的哦),
job配置
<job id="billingJob" restartable="true"> <step id="billingStep" next="payStep"> <tasklet start-limit="java.lang.Integer.MAX_VALUE"> <chunk reader="userDbReader" processor="billingProcessor" writer="billDbWriter" commit-interval="5" chunk-completion-policy=""> </chunk> </tasklet> </step> <step id="payStep"> <tasklet> <chunk reader="billDbReader" processor="payProcessor" writer="payDbWriter" commit-interval="5" chunk-completion-policy="" skip-limit="100"> <skippable-exception-classes> <include class="org.springframework.batch.sample.MoneyNotEnoughException" /> </skippable-exception-classes> </chunk> </tasklet> <next on="COMPLETED WITH SKIPS" to="messageStep"/> <listeners> <listener ref="payStepCheckingListener"></listener> </listeners> </step> <step id="messageStep"> <tasklet> <chunk reader="billArrearsDbReader" processor="messageProcessor" writer="messageDbWriter" commit-interval="5" chunk-completion-policy=""> </chunk> </tasklet> </step> </job><job>标签定义了一个job,其中有若干<step>,每个<step>又包含一个<tasklet>。
spring batch的write是一批批写入的,因此<tasklet>里面是<chunk>标签,commit-interval指定了批写入的大小,chunk-completion-policy指定什么情况下处理完成,默认是返回null。
该job的step有一个分支:
<next on="COMPLETED WITH SKIPS" to="messageStep"/>
当payStep,返回状态为:COMPLETED WITH SKIPS,则跳到messageStep执行。
监听,可以实现不同的接口监听item,task,job级别的处理这里payStepCheckingListener监听payStep的执行状态。
<skippable-exception-classes>,表示如果改step抛该异常时忽略,继续执行
由于spring batch已经实现了众多ItemReader和ItemWritter(从文件读写,从数据库读写等),因此基本上不用写实现他们的代码,配置好bean就好了。ItemProcessor是你需要实现的。
Job运行的参数
JobInstance是根据不同的参数来区分的,同一个JobInstance不能被多次启动。
另外可以在运行Job时,指定参数:
<bean id="cachingListener" class="org.jamee.demo.springbatch.CachingListener" scope="step"> <property name="logCount" value="#{jobParameters[logCount]}" /> </bean>
该logCount参数会在实例化时注入到cachingListener中,注意这里的scop="step",必须指定,以便延迟绑定,这个bean启动前才会实例化。
配置JobRepository
由于spring batch要把运行时信息写入到JobRepository,以便出错时可以重新从上次运行的地方启动,因此需要配置一个transaction-manager和data-source,job启动后,会在data-source创建一系列已BATCH_为前缀的表,记录Job运行状态和参数等。
<beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <beans:property name="dataSource" ref="dataSource" /> <beans:property name="transactionManager" ref="transactionManager" /> </beans:bean>
如果是测试环境,则可以配置为内存:
<beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <beans:property name="transactionManager" ref="transactionManager" /> </beans:bean> <beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
启动Job
启动Job是通过JobLancher启动的,JobLancher是一个接口:
public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException; }Spring batch提供了一个SimpleJobLauncher启动job,注意setTaskExecutor方法设置了一个同步执行的executor,即可以设置异步或者并行的executor。
ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext( "billing_job.xml"); SimpleJobLauncher launcher = new SimpleJobLauncher(); launcher.setJobRepository((JobRepository) c.getBean("jobRepository")); launcher.setTaskExecutor(new SyncTaskExecutor()); try { Map<String, JobParameter> parameters = new HashMap<String, JobParameter>(); parameters.put(RUN_MONTH_KEY, new JobParameter("2011-2")); JobExecution je = launcher.run((Job) c.getBean("billingJob"), new JobParameters(parameters)); System.out.println(je); System.out.println(je.getJobInstance()); System.out.println(je.getStepExecutions()); } catch (Exception e) { e.printStackTrace(); }
相关推荐
Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供...
Work with all aspects of batch processing in a modern Java environment using a selection of Spring frameworks. This book provides up-to-date examples using the latest configuration techniques based on...
Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务。 Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、...
Spring Batch API(Spring Batch 开发文档).CHM。 官网 Spring Batch API,Spring Batch 开发文档
Spring Boot整合Spring Batch的一个小例子,在网上发现这方面的资源比较少,特此将其上传供大家学习。
Spring Batch in Action英文pdf版,最新Spring教科书
spring batch批处理框架和对应的源码资源 rar 可以直接运行的
spring-batch4.0.0 batch spring-batch集成 spring-batch.jar
基于Spring Batch的大数据量并行处理 基于Spring Batch的大数据量并行处理
最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。 最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。
Spring Batch批处理框架Spring Batch批处理框架Spring Batch批处理框架
主要给大家介绍了Spring Batch读取txt文件并写入数据库的方法,SpringBatch 是一个轻量级、全面的批处理框架。这里我们用它来实现文件的读取并将读取的结果作处理,处理之后再写入数据库中的功能。需要的朋友可以...
Spring Batch in Action is a comprehensive, in-depth guide to writing batch applications using Spring Batch. Written for developers who have basic knowledge of Java and the Spring lightweight ...
SpringBatch数据库建表语句,存储springBatch批处理过程中需要保存的数据和步骤信息
mybatis、springBatch、mysql、quartz、spring、springMVC 部署说明: 本项目为两个数据库,由一个数据库的表向另外一个数据库的表做数据迁移,其中数据库脚本在:/src/main/resources/sql/下面(其中data_rep中的表...
难得的详细spring batch资料 难得的详细spring batch资料
使用spring batch需要在数据库建立的几个表——建表语句(BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION、BATCH_JOB_EXECUTION_CONTEXT、`BATCH_JOB_EXECUTION_PARAMS` 、`BATCH_JOB_EXECUTION_SEQ` 、`BATCH_JOB_SEQ` ...
《Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的架构设计、源码做了特定的剖析;在帮助读者掌握...
资源名称:Spring Batch 批处理框架内容简介:《Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的...
SpringBatch+SpringBoot构建海量数据企业批处理系统和性能优化,Spring Batch是一个基于Spring的企业级批处理框架,所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再...