`

merge sort collection, block non block algorithm

 
阅读更多

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class CollectionTest<T extends Comparable> {

	public Collection<T> mergeSortedCollection(Collection<T> a, Collection<T> b) {
		Collection<T> result = new LinkedList<T>();
		Iterator<T> iA = a.iterator();
		Iterator<T> iB = b.iterator();
		T oA = null;
		if (iA.hasNext()) {
			oA = iA.next();
		}
		T oB = null;
		if (iB.hasNext()) {
			oB = iB.next();
		}
		while (oA != null && oB != null) {
			// System.out.println(oA+" "+oB);
			if (oA.compareTo(oB) <= 0) {
				result.add(oA);
				if (iA.hasNext()) {
					oA = iA.next();
				} else {
					oA = null;
				}
			} else {
				result.add(oB);
				if (iB.hasNext()) {
					oB = iB.next();
				} else {
					oB = null;
				}
			}
		}
		if (oA == null) {
			result.add(oB);
			while (iB.hasNext()) {
				oB = iB.next();
				result.add(oB);
			}
		} else if (oB == null) {
			result.add(oA);
			while (iA.hasNext()) {
				oA = iA.next();
				result.add(oA);
			}
		}

		return result;
	}

	
	static ExecutorService pools = Executors.newFixedThreadPool(64);
	static List getDataList = new LinkedList();
	static List insertDataList = new LinkedList();
	public static void main(String[] args) throws Throwable {
		
		blockAlgorithm();
	}
	
	static void blockAlgorithm() throws Throwable{
		long start = System.currentTimeMillis();
		List<Item> datas = getData();
		
		while(getDataList.size()<100000){
			final CountDownLatch finishSignal = new CountDownLatch(datas.size());
			
			for (final Item item : datas) {
				pools.execute(new Runnable() {
					@Override
					public void run() {
						try {
							item.done = true;
							Thread.sleep(500);
						} catch (Exception e) {
							e.printStackTrace();
						}finally{
							finishSignal.countDown();
						}
					}
				});
			}
			finishSignal.await();
			
			insertDB(datas);
			
			datas.clear();
			datas = getData();
		}
		System.out.println("finish "+(System.currentTimeMillis() - start));
		pools.shutdown();
	}
	
	static void nonblockAlgorithm() throws Throwable{
		final long start = System.currentTimeMillis();
		final ConcurrentLinkedQueue result = new ConcurrentLinkedQueue();
		
		Runnable dataFetch = new Runnable() {
			
			@Override
			public void run() {
				try {
					List<Item> datas = getData();
					while(getDataList.size()<=100000){
						for (final Item item : datas) {
							pools.execute(new Runnable() {
								@Override
								public void run() {
									try {
										Thread.sleep(500);
										item.done = true;
										result.add(item);
//										System.out.println(item.seNum+" "+insertDataList.size());
									} catch (Exception e) {
										e.printStackTrace();
									}
								}
							});
						}
						datas.clear();
						datas = getData();
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
				System.out.println("dataFetch end");
				
			}
		};
		
		Runnable inserter = new Runnable() {
			
			@Override
			public void run() {
				List a = new LinkedList();
				while (insertDataList.size()<100000) {
//					System.out.println(a.size());
					Object o = result.poll();
					if (o!=null) {
						a.add(o);
						if (a.size()==1000) {
							try {
								insertDB(a);
								a.clear();
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
					
				}
				System.out.println("finish "+(System.currentTimeMillis() - start));
				System.out.println("inserter end");
				pools.shutdown();
			}
		}; 
		pools.execute(dataFetch);
		pools.execute(inserter);
	}



	private static void insertDB(List<Item> datas) throws InterruptedException {
		Thread.sleep(1000);
		insertDataList.addAll(datas);
		System.out.println("insertDB size="+insertDataList.size());
	}



	private static List getData() throws InterruptedException {
		List datas = new LinkedList();
		for (int i = 0; i < 1000; i++) {
			datas.add(new Item());
		}
		getDataList.addAll(datas);
		Thread.sleep(1000);
		System.out.println("getData size="+getDataList.size());
		return datas;
	}
}

class Item {
	boolean done;
	int seNum;
	
	static AtomicInteger ai= new AtomicInteger(0);
	
	public Item() {
		seNum = ai.getAndIncrement();
	}
}

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics