`

java批量插入队列笔记

阅读更多

往数据库里插入大量的数据,当然是批量插入最高效,我们设定一个题目,每次把数据放入队列,当数据大于1000条或者时间大于5分钟后把数据批量入库

队列处理代码:

package bathQueue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * <p>Title: BatchQueue.java</p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2014</p>
 * @author 雪含心
 * @date 2014年3月1日
 */
public class BatchQueue<T> {
	
	// 默认间隔处理队列时间
	private static int DEFAULT_TIME = 5000;
	// 默认队列处理长度
	private static int DEFAULT_COUNT = 2000;
	// 设置队列处理时间
	private long handleTime;
	// 设置队列处理长度
	private int handleLength;
	// 阻塞队列
	ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(20000);
	// 回调接口
	private QueueProcess<T> process;
	
	// 用来存放从队列拿出的数据
	private List<T> dataList;
	
	// 往队列添加数据
	public void add(T t){
		queue.add(t);
	}
	// 清理生成的list
	public void clearList(){
		dataList = null;
		dataList = new ArrayList<T>();
	}
	
	/**
	 * 最原始的构造方法,使用这个构造方法设置默认的队列处理时间和数量
	 * @param process
	 */
	public BatchQueue(QueueProcess<T> process){
		 this(DEFAULT_TIME, DEFAULT_COUNT, process);
	}
	/**
	 * 可以设置队列的处理时间和处理长度
	 * @param handleTime
	 * @param handleQueueLength
	 * @param process
	 */
	public BatchQueue(int handleTime, int handleQueueLength, QueueProcess<T> process){
		this.process = process;
		this.handleTime = handleTime;
		this.handleLength = handleQueueLength;
		start();
	}
	private void  start(){
		
		dataList = new ArrayList<T>(handleLength);
		DataListener listener = new  DataListener();
		new Thread(listener).start();
		
	}
	// 队列监听,当队列达到一定数量和时间后处理队列
	class DataListener implements Runnable{
		
		@Override
		public void run() {
			
			long startTime = System.currentTimeMillis();
			T t = null;
			while(true){
				try {
					// 从队列拿出队列头部的元素,如果没有就阻塞
					t = queue.take();
					if(null != t){
						 dataList.add(t);
					}
					if(dataList.size() >= DEFAULT_COUNT){
						startTime = callBack(dataList);
						continue;
  					}
					long currentTime = System.currentTimeMillis();
					System.out.println("currentTime - startTime" + (currentTime - startTime) + "handleTime==>" + handleTime);
					if(currentTime - startTime > handleTime){
						startTime = callBack(dataList);
						continue;
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			
			}
		}

		private long callBack(List<T> dataList) {
			
			// 处理队列
			try{
				System.out.println(dataList);
				process.processData(dataList);
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				// 清理掉dataList中的元素
				clearList();
			}
			
			
			return System.currentTimeMillis();
		}
		
	}
}
 /**
  * add        增加一个元索                     如果队列已满,则抛出一个IIIegaISlabEepeplian异常 
remove   移除并返回队列头部的元素    如果队列为空,则抛出一个NoSuchElementException异常 
element  返回队列头部的元素             如果队列为空,则抛出一个NoSuchElementException异常 
offer       添加一个元素并返回true       如果队列已满,则返回false 
poll         移除并返问队列头部的元素    如果队列为空,则返回null 
peek       返回队列头部的元素             如果队列为空,则返回null 
put         添加一个元素                      如果队列满,则阻塞 
take        移除并返回队列头部的元素     如果队列为空,则阻塞 
  */

 数据处理抽象类

package com.zh.utils;

import java.util.List;

/**
 * 批量数据回调接口
 * @author zhanghua
 *
 * @param <T>
 */
public interface  BatchQueueCallback<T> {

	/**
	 * 用于接收批量数据
	 * @param list 批量数据
	 */
	public abstract  void batch(List<T> list);
}

 数据库处理示例

package bathQueue;

import java.util.List;

/**
 * <p>Title: DataInsert.java</p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2014</p>
 * @author 雪含心
 * @param <T>
 * @date 2014年3月1日
 */
public class DataInsert<T> extends QueueProcess<T> {

	@Override
	public void processData(List<T> list) {
		
 	}
	public static void main(String[] args) throws Exception{
		DataInsert back = new DataInsert<>();
		BatchQueue<String> queue = new BatchQueue<String>(back);
		
		for(int i = 0; i < 20000; i ++){
			queue.add("a" +i);
			Thread.sleep(2000);
		}
	}

}

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics