`
dannyhz
  • 浏览: 368841 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

自己做的 取sequence的 使用blockingqueue 和 ScheduledExecutorService例子

 
阅读更多
引用


在取 sequence的 dao里 用了一个 mock的 实现 ,然后 把queue的容量设置为3, 但是每次取可以取到4
这样第一次会把第4个元素 不放进去, 只放进去3个 ,




package com.ssc.dbcttool.seqgenerator;

import java.util.List;

public interface KeyDao {
	List<Long> getSequences(String seqName, Long capbility);
	
	
}


package com.ssc.dbcttool.seqgenerator;

import java.util.ArrayList;
import java.util.List;

public class MockKeyDao implements KeyDao{

	@Override
	public List<Long> getSequences(String seqName, Long capbility) {
		List<Long> longList = new ArrayList<Long>();
		longList.add(1000L);
		longList.add(1001L);
		longList.add(1010L);
		longList.add(1100L);
		
		return longList;
	}
	
	
	
}






package com.ssc.dbcttool.seqgenerator;

public class KeyGeneratorClient {
	
	public static void main(String[] args) {
		
		SeqGeneratorImpl gen = new SeqGeneratorImpl();
		gen.setGenerateKeysDao(new MockKeyDao());
		
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		System.out.println("key = " + gen.getSeqKey("aaa"));
		
	}

}




package com.ssc.dbcttool.seqgenerator;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.statestr.gcth.core.concurrency.DefaultThreadFactory;

public class SeqGeneratorImpl {

	final int capacity = 3;

	ConcurrentMap<String, BlockingQueue<Long>> keyQueues = new ConcurrentHashMap<String, BlockingQueue<Long>>();

	ScheduledExecutorService scheduler;

	public SeqGeneratorImpl() {
		scheduler = new ScheduledThreadPoolExecutor(2, new DefaultThreadFactory().daemon().eh(
				new UncaughtExceptionHandler() {
					public void uncaughtException(Thread t, Throwable e) {
						System.out.println("Thread " + t.getName()
								+ " for Key Generator throw a UncaughtException.");
					}
				}));
	}
	
	public Long getSeqKey(String keyName) {
		Long key = null;
		BlockingQueue<Long> keyQueue =  keyQueues.get(keyName);

		if (keyQueue == null) {
			keyQueue = getQueue(keyName);
		}

		try {
			key = keyQueue.take();
		} catch (Exception e) {
			Thread.currentThread().interrupt();
			System.out.println("Failed to get sequence for unknown reason, the operation is interrupted.");
		}
		return key;
	}
	
	synchronized BlockingQueue<Long> getQueue(String keyName) {
		BlockingQueue<Long> keyQueue = keyQueues.get(keyName);
		if (keyQueue == null) {
			keyQueue = new LinkedBlockingQueue<Long>(capacity);
			keyQueues.putIfAbsent(keyName, keyQueue);
			triggerTheTask(keyName, keyQueue);
		}
		return keyQueue;
	}

	void triggerTheTask(String name, BlockingQueue<Long> keyQueue) {
		scheduler.scheduleWithFixedDelay(new FetchKeyTask(name, keyQueue), 0,
				3000, TimeUnit.MILLISECONDS);
	}

	protected class FetchKeyTask implements Runnable {

		private String keyName;
		private BlockingQueue<Long> keyQueue = null;

		public FetchKeyTask(BlockingQueue<Long> keyQueue) {
			this(null, keyQueue);
		}

		public FetchKeyTask(String keyName, BlockingQueue<Long> keyQueue) {
			this.keyName = keyName;
			this.keyQueue = keyQueue;
		}

		@Override
		public void run() {

			int remainingCapacity = keyQueue.remainingCapacity();
			if (capacity * (1 - 0.5) < remainingCapacity) {
				try {
					List<Long> keys = getKeys(keyName, remainingCapacity);
					for (Long key : keys) {
						keyQueue.put(key);
					}
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					System.out.println("key fetch task is interrupted.");
				} catch (Exception e) {
					System.out.println("key fetch task happens exception:");
				}
				System.out.println("retrieve keys from database, the keys' amount is "
						+ remainingCapacity);
			}
		}
	}
	
	protected List<Long> getKeys(String name, long capacity) {
		return generateKeysDao.getSequences(name, capacity);
	}
	
	protected KeyDao generateKeysDao;
	
	public void setGenerateKeysDao(KeyDao generateKeysDao) {
		this.generateKeysDao = generateKeysDao;
	}
	
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics