`
chenzehe
  • 浏览: 532404 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

BoneCP源码——BoneCP中使用的多线程

 
阅读更多
1、asyncExecutor 可缓存线程池,用于异步的创建一个Connection对象,返回Future类型对象
	/** Executor service for obtaining a connection in an asynchronous fashion. */
	private ExecutorService asyncExecutor;
	/**
	 * Constructor.
	 * @param config Configuration for pool
	 * @throws SQLException on error
	 */
	public BoneCP(BoneCPConfig config) throws SQLException {
		......
		//在构造函数中初始化
		this.asyncExecutor = Executors.newCachedThreadPool();
		......
        }

	/** Obtain a connection asynchronously by queueing a request to obtain a connection in a separate thread. 
	 * 
	 *  Use as follows:<p>
	 *      Future&lt;Connection&gt; result = pool.getAsyncConnection();<p>
	 *       ... do something else in your application here ...<p>
	 *      Connection connection = result.get(); // get the connection<p>
	 *      
	 * @return A Future task returning a connection. 
	 */
	public Future<Connection> getAsyncConnection(){

		return this.asyncExecutor.submit(new Callable<Connection>() {

			public Connection call() throws Exception {
				return getConnection();
			}});
	}

 

2、releaseHelper 用于关闭Connection对象的线程池,该池程池中的线程为守护线程(Daemon Thread)

/** pointer to the thread containing the release helper threads. */
	private ExecutorService releaseHelper;

	/**
	 * Constructor.
	 * @param config Configuration for pool
	 * @throws SQLException on error
	 */
	public BoneCP(BoneCPConfig config) throws SQLException {
                ......
                //Gets number of release-connection helper threads to create per partition
    		int helperThreads = config.getReleaseHelperThreads();
		this.releaseHelperThreadsConfigured = helperThreads > 0;
                //If set to true, config has specified the use of statement release helper threads.		
		this.statementReleaseHelperThreadsConfigured = config.getStatementReleaseHelperThreads() > 0;
		this.config = config;
		String suffix = "";
		if (config.getPoolName()!=null) {
			suffix="-"+config.getPoolName();
		}			
		if (this.releaseHelperThreadsConfigured){
			this.releaseHelper = Executors.newFixedThreadPool(helperThreads*config.getPartitionCount(), new CustomThreadFactory("BoneCP-release-thread-helper-thread"+suffix, true));
		}
                ......
          }

 如果用户设置了以独立线程来关闭Connection对象,才创建该线程池。

ThreadFactory用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

public interface ThreadFactory {
    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}
 CustomThreadFactory为BoneCP里对ThreadFactory的一个实现,并可设置线程是否为守护线程(Daemon Thread):
package com.jolbox.bonecp;
......
public class CustomThreadFactory
        implements ThreadFactory, UncaughtExceptionHandler {
    public CustomThreadFactory(String threadName, boolean daemon){
        this.threadName = threadName;
        this.daemon = daemon;
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, this.threadName);
        t.setDaemon(this.daemon);
        t.setUncaughtExceptionHandler(this);
        return t;
    }
    ......
}
 
3、keepAliveScheduler 该线程池用于定期地测试connection的活性,即用它发送一条简单的SQL,并关闭故障的Connection,每个分区一个线程,此线程池中的线程也为守护线程。
	/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
	 * activity on the connection.
	 */
	private ScheduledExecutorService keepAliveScheduler;
        this.keepAliveScheduler =  Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-keep-alive-scheduler"+suffix, true));
 newScheduledThreadPool创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求,相当于Timer。
			if (config.getIdleConnectionTestPeriodInMinutes() > 0 || config.getIdleMaxAgeInMinutes() > 0){
				
				final Runnable connectionTester = new ConnectionTesterThread(connectionPartition, this.keepAliveScheduler, this, config.getIdleMaxAge(TimeUnit.MILLISECONDS), config.getIdleConnectionTestPeriod(TimeUnit.MILLISECONDS), queueLIFO);
				long delayInMinutes = config.getIdleConnectionTestPeriodInMinutes();
				if (delayInMinutes == 0L){
					delayInMinutes = config.getIdleMaxAgeInMinutes();
				}
				if (config.getIdleMaxAgeInMinutes() != 0 && config.getIdleConnectionTestPeriodInMinutes() != 0 && config.getIdleMaxAgeInMinutes() < delayInMinutes){
					delayInMinutes = config.getIdleMaxAgeInMinutes();
				}
				this.keepAliveScheduler.schedule(connectionTester, delayInMinutes, TimeUnit.MINUTES);
			}

 如果用户没有设置了下面两个属性小于1就启动该线程池的任务:

	/** Connections older than this are sent a keep-alive statement. */
	private long idleConnectionTestPeriodInSeconds = 240*60; 
	/** Maximum age of an unused connection before it is closed off. */ 
	private long idleMaxAgeInSeconds =  60*60; 

 实例化一个ConnectionTesterThread类型的Runnable对象,该对象中也持有此线程池的引用,用于在run方法中启动下次任务,此对象的run方法负责对异常的Connection对象和超出闲置时间的对象进行close并定期给Connection对象发送简单SQL语句:

// send a keep-alive, close off connection if we fail.
if (!this.pool.isConnectionHandleAlive(connection)){
    closeConnection(connection);
    continue; 
}

 

4、maxAliveScheduler  该线程池用于给每个分区创建一个线程定期的检查Connection对象是否过期,此线程池中的线程也是守护线程

	/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
	 * activity on the connection.
	 */
	private ScheduledExecutorService maxAliveScheduler;
        this.maxAliveScheduler =  Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-max-alive-scheduler"+suffix, true));

 如果用户设置了下面属性大于0则使用该线程池:

	/** A connection older than maxConnectionAge will be destroyed and purged from the pool. */
	private long maxConnectionAgeInSeconds = 0;
 
			if (config.getMaxConnectionAgeInSeconds() > 0){
				final Runnable connectionMaxAgeTester = new ConnectionMaxAgeThread(connectionPartition, this.maxAliveScheduler, this, config.getMaxConnectionAge(TimeUnit.MILLISECONDS), queueLIFO);
				this.maxAliveScheduler.schedule(connectionMaxAgeTester, config.getMaxConnectionAgeInSeconds(), TimeUnit.SECONDS);
			}
 先实例化一个ConnectionMaxAgeThread类型的Runnable对象,该对象定期的对超过maxConnectionAge类型的对象进行关闭:
if (connection.isExpired(currentTime)){
    // kill off this connection
    closeConnection(connection);
    continue;
}
 ConnectionMaxAgeThread对象中也对该线程池持有引用来启动下次全任务。

 

5、connectionsScheduler  该线程池用于观察每个分区,根据需要动态的创建新的Connection对象或者清理过剩的,也为守护线程

/** Executor for threads watching each partition to dynamically create new threads/kill off excess ones.
	 */
	private ExecutorService connectionsScheduler;
	this.connectionsScheduler =  Executors.newFixedThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-pool-watch-thread"+suffix, true));

        // watch this partition for low no of threads
	this.connectionsScheduler.execute(new PoolWatchThread(connectionPartition, this));

 

 6、closeConnectionExecutor  该线程池用于监控那些失败的close操作

	/** Threads monitoring for bad connection requests. */
	private ExecutorService closeConnectionExecutor;
        this.closeConnectionWatch = config.isCloseConnectionWatch();
	if (this.closeConnectionWatch){
		logger.warn(THREAD_CLOSE_CONNECTION_WARNING);
		this.closeConnectionExecutor =  Executors.newCachedThreadPool(new CustomThreadFactory("BoneCP-connection-watch-thread"+suffix, true));
	}

 在getConection()方法中如果用户设置了下面属性则启用该线程:

	/** If set to true, create a new thread that monitors a connection and displays warnings if application failed to 
	 * close the connection.
	 */
	protected boolean closeConnectionWatch = false;

 

 

		if (this.closeConnectionWatch){ // a debugging tool
			watchConnection(result);
		}
 
	/** Starts off a new thread to monitor this connection attempt.
	 * @param connectionHandle to monitor 
	 */
	private void watchConnection(ConnectionHandle connectionHandle) {
		String message = captureStackTrace(UNCLOSED_EXCEPTION_MESSAGE);
		this.closeConnectionExecutor.submit(new CloseThreadMonitor(Thread.currentThread(), connectionHandle, message, this.closeConnectionWatchTimeoutInMs));
	}
 
//	@Override
	public void run() {
		try {
			this.connectionHandle.setThreadWatch(Thread.currentThread());
			// wait for the thread we're monitoring to die off.
			this.threadToMonitor.join(this.closeConnectionWatchTimeout);
			if (!this.connectionHandle.isClosed() 
					&& this.threadToMonitor.equals(this.connectionHandle.getThreadUsingConnection())
				){
				logger.error(this.stackTrace);
			}
		} catch (Exception e) {
			// just kill off this thread
			if (this.connectionHandle != null){ // safety
				this.connectionHandle.setThreadWatch(null);
			}
		} 
	}
 

 

分享到:
评论

相关推荐

    JAVA上百实例源码以及开源项目源代码

     Java非对称加密源程序代码实例,本例中使用RSA加密技术,定义加密算法可用 DES,DESede,Blowfish等。  设定字符串为“张三,你好,我是李四”  产生张三的密钥对(keyPairZhang)  张三生成公钥(publicKeyZhang...

    JAVA上百实例源码以及开源项目

     Java非对称加密源程序代码实例,本例中使用RSA加密技术,定义加密算法可用 DES,DESede,Blowfish等。  设定字符串为“张三,你好,我是李四”  产生张三的密钥对(keyPairZhang)  张三生成公钥(publicKeyZhang...

    基于SSM架构实现的大型分布式购物网站-B2C项目源码+项目说明.zip

    方案2:在负载均衡服务器中运行一个精灵线程,预测服务器压力过大时会自动把session转移压力过小的服务器中。 3、做专门的图片服务器。使用一个http服务器,Apache.或者Nginx。使用ftp服务上传图片,vsftpd ##...

    java开源包8

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包1

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包11

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包2

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包3

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包6

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包5

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包10

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包4

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包7

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包9

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包101

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    Java资源包01

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

Global site tag (gtag.js) - Google Analytics