`
virusfu
  • 浏览: 179730 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

spring Batch实现数据库大数据量读写

阅读更多

1. data-source-context.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

	<!-- 1) USE ANNOTATIONS TO IDENTIFY AND WIRE SPRING BEANS. -->
	<context:component-scan base-package="net.etongbao.vasp.ac" />
	
	<!-- 2) DATASOURCE, TRANSACTION MANAGER AND JDBC TEMPLATE -->
 
	<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"
		destroy-method="close" abstract="false" scope="singleton">
		<!-- oracle.jdbc.driver.oracleDriver -->
		<property name="driverClass" value="oracle.jdbc.OracleDriver" />
		<property name="jdbcUrl" value="jdbc:oracle:thin:@192.168.1.23:1521:orcl01" />
		<property name="user" value="USR_DEV01" />
		<property name="password" value="2AF0829C" />
		<property name="checkoutTimeout" value="30000" />
		<property name="maxIdleTime" value="120" />
		<property name="maxPoolSize" value="100" />
		<property name="minPoolSize" value="2" />
		<property name="initialPoolSize" value="2" />
		<property name="maxStatements" value="0" />
		<property name="maxStatementsPerConnection" value="0" />
		<property name="idleConnectionTestPeriod" value="30" />	
	</bean>
	<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>
	
	<tx:annotation-driven transaction-manager="transactionManager" />
</beans>

 2. quartz-context.xml       commit-interval="10000"每次批量数据的条数,数值越大效率越高,可在此处添加事物处理,

每次回滚数就是commit-interval数

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

	<import resource="data-source-context.xml"/>
	
	<!--   JOB REPOSITORY - WE USE IN-MEMORY REPOSITORY FOR OUR EXAMPLE -->
	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>
	
	<!-- batch config -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<!--  FINALLY OUR JOB DEFINITION. THIS IS A 1 STEP JOB -->
	<batch:job id="ledgerJob">
		<batch:listeners>
			<batch:listener ref="appJobExecutionListener" />
		</batch:listeners>
		<batch:step id="step1">
		   <batch:tasklet transaction-manager="transactionManager">
			<batch:tasklet>
				<batch:listeners>
					<batch:listener ref="itemFailureLoggerListener" />
				</batch:listeners>
				<batch:chunk reader="ledgerReader" writer="ledgerWriter"
					commit-interval="10000" /> <!-- 1万条进行一次commit -->
			</batch:tasklet>
                   </batch:tasklet>
		</batch:step>
	</batch:job>
	 
	<!--  READER -->
	<bean id="ledgerReader"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
		<property name="sql" value="select * from ledger" /> 
		<property name="rowMapper" ref="ledgerRowMapper" />
	</bean>
	 
	<!-- Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,
		添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance -->
	<bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" />
	
	<!-- 定时任务 开始 -->  
	<bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">  
		<property name="targetObject">  
			<!-- 定时执行的类 -->  
			<ref bean="quartzLedgerJob" />  
		</property>  
		<property name="targetMethod">  
			<!-- 定时执行的类方法 -->  
			<value>execute</value>  
		</property>  
	</bean>  
  
	<bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" >  
		<!-- 这里不可以直接在属性jobDetail中引用taskJob,因为他要求的是一个jobDetail类型的对象,所以我们得通过MethodInvokingJobDetailFactoryBean来转一下 -->  
		<property name="jobDetail" >  
			<ref bean="ledgerJobDetail" />  
		</property>  
		<!--在每天下午18点到下午18:59期间的每1分钟触发  -->  
		<!--在每天上午10点40分准时触发  -->  
		<property name="cronExpression" >  
			<!-- <value>0 * 15 * * ?</value> -->
			<value>0 45 10 * * ? * </value> 
		</property>  
        
	</bean>  
      
	<!-- 触发器工厂,将所有的定时任务都注入工厂-->  
	<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">  
		<!-- 添加触发器 -->  
		<property name="triggers">  
			<list>  
				<!-- 将上面定义的测试定时任务注入(可以定义多个定时任务,同时注入)-->  
				<ref local="ledgerCronTrigger" />  
			</list>  
		</property>  
	</bean>  
	<!-- 定时任务 结束 -->  
</beans>  

 

3.定时调度job类 QuartzLedgerJob.java

 

 package net.etongbao.vasp.ac.quartz;

import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

/**
 * 定时调度类
 * @author Fu Wei
 *
 */

@Component("quartzLedgerJob")
public class QuartzLedgerJob {

	private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);

	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job ledgerJob;

	@Autowired
	JobParametersBuilder jobParameterBulider;

	private static long counter = 0l;
	
	/**
	 * 执行业务方法
	 * @throws Exception
	 */
	public void execute() throws Exception {
		LOG.debug("start...");
		StopWatch sw = new StopWatch();
		sw.start();
		/*
		 * Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,
		 * 可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,
		 * 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance
		 */
		jobParameterBulider.addDate("date", new Date());
		jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());
		sw.stop();
		LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", sw.prettyPrint(), ++counter);
	}
}
 

 4.程序启动类 StartQuartz.java

 package net.etongbao.vasp.ac.quartz;

import java.io.FileNotFoundException;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 启动定时调度
 * @author Fu Wei
 *
 */
public class StartQuartz {

	public static void main(String[] args) throws FileNotFoundException {

		new ClassPathXmlApplicationContext("/net/etongbao/vasp/ac/resources/quartz-context.xml");
	}
}

 

5.pojo类 Ledger.java

 

package net.etongbao.vasp.ac.pojo;

import java.io.Serializable;
import java.util.Date;

public class Ledger implements Serializable {

	private int id;
	private Date receiptDate;
	private String memberName;
	private String checkNumber;
	private Date checkDate;
	private String paymentType;
	private double depositAmount;
	private double paymentAmount;
	private String comments;

	public Ledger() {
		super();
	}

	public Ledger(int id, Date receiptDate, String memberName, String checkNumber, Date checkDate, String paymentType,
	        double depositAmount, double paymentAmount, String comments) {
		super();
		this.id = id;
		this.receiptDate = receiptDate;
		this.memberName = memberName;
		this.checkNumber = checkNumber;
		this.checkDate = checkDate;
		this.paymentType = paymentType;
		this.depositAmount = depositAmount;
		this.paymentAmount = paymentAmount;
		this.comments = comments;
	}

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public Date getReceiptDate() {
		return receiptDate;
	}

	public void setReceiptDate(Date receiptDate) {
		this.receiptDate = receiptDate;
	}

	public String getMemberName() {
		return memberName;
	}

	public void setMemberName(String memberName) {
		this.memberName = memberName;
	}

	public String getCheckNumber() {
		return checkNumber;
	}

	public void setCheckNumber(String checkNumber) {
		this.checkNumber = checkNumber;
	}

	public Date getCheckDate() {
		return checkDate;
	}

	public void setCheckDate(Date checkDate) {
		this.checkDate = checkDate;
	}

	public String getPaymentType() {
		return paymentType;
	}

	public void setPaymentType(String paymentType) {
		this.paymentType = paymentType;
	}

	public double getDepositAmount() {
		return depositAmount;
	}

	public void setDepositAmount(double depositAmount) {
		this.depositAmount = depositAmount;
	}

	public double getPaymentAmount() {
		return paymentAmount;
	}

	public void setPaymentAmount(double paymentAmount) {
		this.paymentAmount = paymentAmount;
	}

	public String getComments() {
		return comments;
	}

	public void setComments(String comments) {
		this.comments = comments;
	}
}

 

 6. LedgerDaoImpl.java

 package net.etongbao.vasp.ac.dao.impl;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import net.etongbao.vasp.ac.dao.LedgerDao;
import net.etongbao.vasp.ac.pojo.Ledger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.stereotype.Repository;

/**
 * ledger数据操作类
 * 
 * @author Fu Wei
 * 
 */

@Repository
public class LedgerDaoImpl implements LedgerDao {

	private static final String SAVE_SQL = "insert into ledger_temp (rcv_dt, mbr_nm, chk_nbr, chk_dt, pymt_typ, dpst_amt, pymt_amt, comments) values(?,?,?,?,?,?,?,?)";

	@Autowired
	private JdbcTemplate jdbcTemplate;

	@Override
	public void save(final Ledger item) {
		jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() {
			public void setValues(PreparedStatement stmt) throws SQLException {
				stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime()));
				stmt.setString(2, item.getMemberName());
				stmt.setString(3, item.getCheckNumber());
				stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime()));
				stmt.setString(5, item.getPaymentType());
				stmt.setDouble(6, item.getDepositAmount());
				stmt.setDouble(7, item.getPaymentAmount());
				stmt.setString(8, item.getComments());
			}
		});
	}

}
 

7.接口 LedgerDao .java

 

 

package net.etongbao.vasp.ac.dao;

import net.etongbao.vasp.ac.pojo.Ledger;

public interface LedgerDao {
	public void save(final Ledger item) ;
}

 

 8. JdbcTemplete 需要的LedgerRowMapper.java

  package net.etongbao.vasp.ac.batch.writer;

import java.sql.ResultSet;
import java.sql.SQLException;

import net.etongbao.vasp.ac.pojo.Ledger;

import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
 
/**
 * ledger行的映射类
 * @author Administrator
 *
 */
@Component("ledgerRowMapper")
public class LedgerRowMapper implements RowMapper {
	public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
		Ledger ledger = new Ledger();
		ledger.setId(rs.getInt("id"));
		ledger.setReceiptDate(rs.getDate("rcv_dt"));
		ledger.setMemberName(rs.getString("mbr_nm"));
		ledger.setCheckNumber(rs.getString("chk_nbr"));
		ledger.setCheckDate(rs.getDate("chk_dt"));
		ledger.setPaymentType(rs.getString("pymt_typ"));
		ledger.setDepositAmount(rs.getDouble("dpst_amt"));
		ledger.setPaymentAmount(rs.getDouble("pymt_amt"));
		ledger.setComments(rs.getString("comments"));
		return ledger;
	}
}

 

9.关键类LedgerWriter.java ,写入数据,负责数据的添加 

 

package net.etongbao.vasp.ac.batch.writer;

import java.util.List;

import net.etongbao.vasp.ac.dao.LedgerDao;
import net.etongbao.vasp.ac.pojo.Ledger;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * ledger写入数据
 * 
 * @author Fu Wei
 * 
 */
@Component("ledgerWriter")
public class LedgerWriter implements ItemWriter<Ledger> {

	@Autowired
	private LedgerDao ledgerDao;

	/**
	 * 写入数据
	 * 
	 * @param ledgers
	 */
	public void write(List<? extends Ledger> ledgers) throws Exception {
		for (Ledger ledger : ledgers) {
			ledgerDao.save(ledger);
		}
	}

}

 

 

 classPath:

 <?xml version="1.0" encoding="UTF-8"?>

<classpath>
	<classpathentry kind="src" path="src"/>
	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jrockit-jdk1.6.0_24-R28.1.3-4.0.1"/>
	<classpathentry kind="lib" path="lib/aopalliance-1.0.jar"/>
	<classpathentry kind="lib" path="lib/c3p0-0.9.1.2.jar"/>
	<classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/>
	<classpathentry kind="lib" path="lib/commons-lang-2.3.jar"/>
	<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
	<classpathentry kind="lib" path="lib/etb-log4j-1.2.16.jar"/>
	<classpathentry kind="lib" path="lib/etb-slf4j-api-1.5.8.jar"/>
	<classpathentry kind="lib" path="lib/etb-slf4j-log4j12-1.5.8.jar"/>
	<classpathentry kind="lib" path="lib/ojdbc6.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.aop-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.asm-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.aspects-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.beans-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.context-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.context.support-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.core-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.expression-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.instrument-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.instrument.tomcat-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.jdbc-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.jms-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.orm-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.oxm-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.test-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.transaction-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/quartz-all-1.6.5.jar"/>
	<classpathentry kind="lib" path="lib/spring-batch-core-2.1.6.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/spring-batch-infrastructure-2.1.6.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/spring-batch-test-2.1.6.RELEASE.jar"/>
	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
	<classpathentry kind="output" path="bin"/>
</classpath>
 

 

  总结: 测试数据8万多条,响应时间3分多钟。

 

关键在于quartz-context.xml 中<bean id="ledgerReader"

class="org.springframework.batch.item.database.JdbcCursorItemReader">

<property name="dataSource" ref="dataSource" />

<property name="sql" value="select * from ledger" /> 

<property name="rowMapper" ref="ledgerRowMapper" />

</bean> 负责读取数据 ,在程序执行时一次性抓取全部数据后在批量的交给LedgerWriter进行写操作。当然也可以使用分页读取JdbcPagingItemReader,但要分页数量与写入数量要大写相同,还可以对分页出来的数据进行添加悲观锁

LedgerWriter.java 负责写入数据,每次写入1000条。

 

 

 

 

 

分享到:
评论
9 楼 yjc2020 2015-01-26  
问个问题,spring batch 批量写数据时能否做日志输出
8 楼 JimmyLincole 2014-08-25  
yjc2020 写道
JimmyLincole 写道
1楼的问题,解决方法可以看一下这个:


The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.

You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html


3.0与2.2.7 有什么差别,我升级jar包后配置文件出错

3.0与2.2.7有啥差别我也不太清楚呢,不好意思
7 楼 yjc2020 2014-08-19  
JimmyLincole 写道
1楼的问题,解决方法可以看一下这个:


The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.

You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html


3.0与2.2.7 有什么差别,我升级jar包后配置文件出错
6 楼 JimmyLincole 2014-08-14  
1楼的问题,解决方法可以看一下这个:


The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.

You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html
5 楼 yjc2020 2014-04-26  
我也遇到了1楼的错误 Reader must be open before it can be read.

先是成功执行了2次,然后开始出错,失败2次
4 楼 dotame 2014-03-25  
1楼,应该是你传参的文件路径出错了
3 楼 dotame 2014-03-25  
应该是你传参的文件路径出错了
2 楼 mbsky 2014-03-07  
[size=x-large][color=blue][/color][/size]
很nice 的文章
1 楼 wei_jiyong 2013-03-01  
你好,在么?我想问下,我这边总是提示
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:137)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:93)
at org.springframework.batch.core.step.item.ChunkMonitor.open(ChunkMonitor.java:105)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:93)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:301)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:192)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at net.etongbao.vasp.ac.quartz.QuartzLedgerJob.execute(QuartzLedgerJob.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:311)
at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:113)
at org.quartz.core.JobRunShell.run(JobRunShell.java:216)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549)
Caused by: java.lang.IllegalStateException: Stream is already initialized.  Close before re-opening.
at org.springframework.util.Assert.state(Assert.java:384)
at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:397)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:134)
... 25 more
10:02:34,484  INFO org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-2 listener.AppJobExecutionListener:18 - Job failed: 1
10:02:34,484  INFO org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-2 support.SimpleJobLauncher:121 - Job: [FlowJob: [name=ledgerJob]] completed with the following parameters: [{date=1362103354109}] and the following status: [FAILED]
10:02:35,875 ERROR org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-1 listener.ItemFailureLoggerListener:12 - Encountered error on read
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:437)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:85)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:90)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:108)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:103)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:68)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at net.etongbao.vasp.ac.quartz.QuartzLedgerJob.execute(QuartzLedgerJob.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:311)
at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:113)
at org.quartz.core.JobRunShell.run(JobRunShell.java:216)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549)
10:02:35,875 ERROR org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-1 step.AbstractStep:212 - Encountered an error executing the step
org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:104)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:108)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:103)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:68)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at net.etongbao.vasp.ac.quartz.QuartzLedgerJob.execute(QuartzLedgerJob.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:311)
at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:113)
at org.quartz.core.JobRunShell.run(JobRunShell.java:216)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549)
Caused by: org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:437)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:85)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:90)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
... 35 more

这个是怎么回事啊!你有遇到这样的问题么?

相关推荐

    基于Spring Batch的大数据量并行处理

    基于Spring Batch的大数据量并行处理 基于Spring Batch的大数据量并行处理

    Spring Batch读取txt文件并写入数据库的方法教程

    主要给大家介绍了Spring Batch读取txt文件并写入数据库的方法,SpringBatch 是一个轻量级、全面的批处理框架。这里我们用它来实现文件的读取并将读取的结果作处理,处理之后再写入数据库中的功能。需要的朋友可以...

    spring-batch包

    spring-batch4.0.0 batch spring-batch集成 spring-batch.jar

    SpringBatch数据库建表语句.txt

    SpringBatch数据库建表语句,存储springBatch批处理过程中需要保存的数据和步骤信息

    使用spring batch需要在数据库建立的几个表——建表语句

    使用spring batch需要在数据库建立的几个表——建表语句(BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION、BATCH_JOB_EXECUTION_CONTEXT、`BATCH_JOB_EXECUTION_PARAMS` 、`BATCH_JOB_EXECUTION_SEQ` 、`BATCH_JOB_SEQ` ...

    Spring.Batch批处理框架

    对于大数据量和高性能的批处理任务,Spring Batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 Spring Batch 能够支持简单的、复杂的和大数据量的批处理作业。 Spring Batch 是一个批处理应用...

    Spring Boot整合Spring Batch,实现批处理

    Spring Boot整合Spring Batch的一个小例子,在网上发现这方面的资源比较少,特此将其上传供大家学习。

    spring-batch同步数据库mysql源码

    工程中拿掉了两个lib,如需要加入oracle,或者sqlserver,请自行下载驱动。如不需要,在pom中删掉那两条依赖即可。

    springBoot+springBatch批量处理数据demo

    最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。 最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。

    SpringBatch-DataMigration SpringBatch数据迁移项目

    1.本项目运行在tomcat容器中,主要功能为从spring_batch_left库的user_from表抓取数据,之后批量插入到spring_batch_right库的user_to表 2.应用quartz对job进行定时触发(目前设置的定时为每隔一分钟执行一次,目前...

    The Definitive Guide to Spring Batch, 2nd Edition.epub

    Work with all aspects of batch processing in a modern Java environment using a selection of Spring frameworks. This book provides up-to-date examples using the latest configuration techniques based on...

    SpringBatch+SpringBoot构建海量数据企业批处理系统和性能优化

    SpringBatch+SpringBoot构建海量数据企业批处理系统和性能优化,Spring Batch是一个基于Spring的企业级批处理框架,所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再...

    SpringBatch+Spring+Mybatis+MySql (spring batch 使用jar)

    Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供...

    Spring Batch API(Spring Batch 开发文档).CHM

    Spring Batch API(Spring Batch 开发文档).CHM。 官网 Spring Batch API,Spring Batch 开发文档

    quartz整合springbatch动态集群定时实现mysql参考

    本人花费多日时间,亲测有效。所需文件代码完整,只需导入常用开发IDE和mysql数据库即可正常使用。

    Spring Batch in Action英文pdf版

    Spring Batch in Action英文pdf版,最新Spring教科书

    SpringBatch批处理 刘相编

    基本篇重点讲述了数据批处理的核心概念、典型的作业配置、作业步配置,以及Spring Batch框架中经典的三步走策略:数据读、数据处理和数据写,详尽地介绍了如何对CVS格式文件、JSON格式文件、XML文件、数据库和JMS...

    spring batch批处理 教程

    2,什么是 Spring Batch 3 二,Spring Batch结构 4 1,Spring Batch体系结构 4 2,Spring Batch主要对象 5 三,Spring Batch流程介绍 5 四,Spring Batch之Step执行过程介绍 6 五,Spring Batch应用 7 1,简单应用 7...

    SpringBoot+Batch实现

    spring batch官方文档:https://docs.spring.io/spring-batch spring batch3.x中文文档:http://www.kailing.pub/SpringBatchReference spring batch官方入门实例:https://projects.spring.io/spring-batch/ 简单...

Global site tag (gtag.js) - Google Analytics