`
XzMarine
  • 浏览: 3278 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

jetty中的QueuedThreadPool

阅读更多

QueuedThreadPool

doStart()方法负责初始化并启动_minThreads个空线程,等待任务的到来。其中,

_theads用来存放当前池中所有线程;

_idle中存放当前空闲线程;

_jobs为存放任务的队列,先来先服务。

protected void doStart() throws Exception
{
	if (_maxThreads<_minThreads || _minThreads<=0)
		throw new IllegalArgumentException("!0<minThreads<maxThreads");
	
	_threads=new HashSet();
	_idle=new ArrayList();
	_jobs=new Runnable[_maxThreads];
	
	for (int i=0;i<_minThreads;i++)
	{
		newThread();
	}   
}

protected void newThread()
{
	synchronized (_threadsLock)
	{
		if (_threads.size()<_maxThreads)
		{
			PoolThread thread =new PoolThread();
			_threads.add(thread);
			thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
			thread.start(); 
		}
		else if (!_warned)    
		{
			_warned=true;
			Log.debug("Max threads for {}",this);
		}
	}
}

 

初始化完毕,可调用dispatch(Runnable job)方法给线程池分配任务。

如果_idle.size() > 0,即有空闲线程,直接把job分给最后一个空闲线程,并从_idle中删除之。至于为什么是给最后一个?没有为什么,只要你愿意,随便哪一个都行。

如果_idle.size() = 0,即没有空闲线程,那就只好到_jobs中排队啦。

_jobs的示意图如下图,这是一初始大小为5的队列。



_nextJob指向_jobs队列中当前可取任务地址,每取一次任务,_nextJob自增1;而_nextJobSlot指向当前可以放置任务的地址,也就是下一个任务进队列,要放在哪一个位置,每进一个任务,_nextJobSlot自增1。它们初始位置当然都是0。

一般来说,当_nextJobSlot总是处在_nextJob前面的,即_nextJobSlot >_nextJob。
_nextJob == _nextJobSlot时,就说明_jobs队列排满了,没位置了,怎么办?增加队列长度呗。QueuedThreadPool中,每调整一次,增加_maxThreads个空位置。 调整的过程中还涉及到数据的复制和两个指针位置的调整。

一旦发现,队列中的任务数_queued大于_spawnOrShrinkAt这个阀值,说明任务太多,当前线程数偏少,我们需要增加更多的工作线程来执行任务,最多_maxThreads个。

public boolean dispatch(Runnable job) 
{  
	if (!isRunning() || job==null)
		return false;

	PoolThread thread=null;
	boolean spawn=false;
		
	synchronized(_lock)
	{
		// Look for an idle thread
		int idle=_idle.size();
		if (idle>0) //有空闲线程,直接分配
			thread=(PoolThread)_idle.remove(idle-1);
		else
		{
			// queue the job
			_queued++;
			if (_queued>_maxQueued)
				_maxQueued=_queued;
			_jobs[_nextJobSlot++]=job;
			if (_nextJobSlot==_jobs.length)
				_nextJobSlot=0;
			if (_nextJobSlot==_nextJob) //环形队列_jobs已满,需要调整大小
			{
				//在原有基础上增加_maxThreads
				Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
				int split=_jobs.length-_nextJob;
				if (split>0)
					System.arraycopy(_jobs,_nextJob,jobs,0,split);
				if (_nextJob!=0)
					System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
				
				_jobs=jobs;
				_nextJob=0;
				_nextJobSlot=_queued;
			}
			// 队列任务数超过阀值,在可能的情况下,增加工作线程数目  
			spawn=_queued>_spawnOrShrinkAt;
		}
	}
	
	if (thread!=null)
	{
		thread.dispatch(job);
	}
	else if (spawn)
	{
		newThread();
	}
	return true;
}

 

PoolThread

 

上面说了,一旦没有空闲线程,任务就仅仅是加入到_jobs队列中,那么负责从队列中取任务就是PoolThread的事了。

PoolThread在自己的任务完成之后会自觉的到_jobs环形队列中领取任务。有任务,执行之;没有,把自己加入_idle中,等待_maxIdleTimeMs的时间,或QueuedThreadPool的唤醒(调用PoolThread的dispatch(Runnable job)方法)。

当线程处于空闲状态,并发现当前空闲线程数多于_spawnOrShrinkAt阀值时,它就得考虑要不要自我了断了。条件就是距离上次缩减(shrink)空闲线程时间超过_maxIdleTimeMs。

 

public void run()
{
	boolean idle=false;
	Runnable job=null;
	try
	{
		while (isRunning())
		{   
			// Run any job that we have.
			if (job!=null)
			{
				final Runnable todo=job;
				job=null;
				idle=false;
				todo.run();
			}
			
			synchronized(_lock)
			{
				// is there a queued job?
				if (_queued>0)
				{
					_queued--;
					job=_jobs[_nextJob];
					_jobs[_nextJob++]=null;
					if (_nextJob==_jobs.length)
						_nextJob=0;
					continue;
				}

				// Should we shrink?
				final int threads=_threads.size();
				
				// 我不知道为什么_threads.size()会有可能大于_maxThreads
				// 知道的兄弟请告诉我!!
				if (threads>_minThreads && 
					(threads>_maxThreads || 
					 _idle.size()>_spawnOrShrinkAt))   
				{
					long now = System.currentTimeMillis();
					if ((now-_lastShrink)>getMaxIdleTimeMs())
					{
						_lastShrink=now;
						_idle.remove(this);
						return;
					}
				}

				if (!idle)
				{   
					// Add ourselves to the idle set.
					_idle.add(this);
					idle=true;
				}
			}

			// We are idle
			// wait for a dispatched job
			synchronized (this)
			{
				if (_job==null)
					this.wait(getMaxIdleTimeMs());
				job=_job;
				_job=null;
			}
		}
	}
	catch (InterruptedException e)
	{
		Log.ignore(e);
	}
	finally
	{
		synchronized (_lock)
		{
			_idle.remove(this);
		}
		synchronized (_threadsLock)
		{
			_threads.remove(this);
		}
		synchronized (this)
		{
			job=_job;
		}
		
		// we died with a job! reschedule it
		if (job!=null)
		{
			QueuedThreadPool.this.dispatch(job);
		}
	}
}

 

 

参考资料:

http://suo.iteye.com/blog/1390134

  • 大小: 13 KB
分享到:
评论

相关推荐

    Jetty中文手册

    如何编写Jetty中的Handlers 使用构建工具 如何在Maven中使用Jetty 如何在Ant中使用Jetty Maven和Ant的更多支持 Jetty Maven插件(Plugin) Jetty Jspc Maven插件(Plugin) Maven web应用工程原型 Ant Jetty插件...

    jetty相关的全部jar包

    jetty-security-9.4.8.v20171121.jar,jetty-io-9.4.8.v20171121.jar,jetty-continuation-9.4.8.v20171121.jar,jetty-client-9.4.8.v20171121.jar,jetty-jmx-9.4.8.v20171121.jar,jetty-plus-9.4.8.v20171121....

    jetty-io-9.4.43.v20210629-API文档-中文版.zip

    赠送jar包:jetty-io-9.4.43.v20210629.jar; 赠送原API文档:jetty-io-9.4.43.v20210629-javadoc.jar; 赠送源代码:jetty-io-9.4.43.v...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。

    Jetty嵌入项目代码中示例

    Jetty嵌入项目代码中示例,现我有一示例项目 e:/workspace/web-demo(称为project_home),里面的Web根目录是WebContent。 在project_home建一个jetty目录,子目录如:contexts、etc、lib。 把${jetty_home}/etc...

    jetty在eclipse中配置

    自己写的jetty6在eclipse启动中配置说明

    jetty-server-9.4.8.v20171121-API文档-中文版.zip

    赠送jar包:jetty-server-9.4.8.v20171121.jar; 赠送原API文档:jetty-server-9.4.8.v20171121-javadoc.jar; 赠送源代码:jetty-server...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。

    eclipse jetty插件run-jetty-run-1.3.3

    eclipse jetty插件,从...下载run-jetty-run.zip文件,解压后再编写个links文件丢到eclipse的dropins目录下即可,省去了使用eclipse update方式安装的麻烦。 link文件样例如: path=d:\\eclipse_plugins\\run-jetty-run

    Jetty多版本软件包

    Jetty软件包内容: jetty-distribution-9.4.51.v20230217.tar.gz jetty-distribution-9.4.51.v20230217.zip jetty-home-10.0.15.tar.gz jetty-home-10.0.15.zip jetty-home-11.0.15.tar.gz jetty-home-11.0.15.zip ...

    Jetty中文手册打包下载

    Jetty中文手册打包下载

    jetty嵌入式服务器必须的jar包

    jetty嵌入式服务器开发所必须的jar包,本人使用jetty版本为6.1.3,jar包总数为9个,来自jetty:commons-el-1.0.jar,jasper-compiler-5.5.15,jasper-compiler-jdt-5.5.15.jar,jasper-runtime-5.5.15.jar,jetty-...

    maven集成jetty所需jar包maven-jetty-plugin,多版本

    maven集成jetty必须jar包maven-jetty-plugin,内含多个版本

    jetty6 指南书

    jetty是什么 jetty配置 jetty使用 jetty嵌入 jetty启动 jetty部署 jetty教程 jetty嵌入式 jetty

    jetty相关所有jar包

    jetty相关所有jar包,包含jar包: jetty-continuation-8.1.15.v20140411,jetty-http-8.1.15.v20140411,jetty-io-8.1.15.v20140411,jetty-security-8.1.15.v20140411,jetty-server-8.1.15.v20140411,jetty-util-8.1.15...

    Java Eclipse ee集合jetty和配置

    1. 服务器配置:在 Eclipse EE 中,可以在 Server 视图中配置 Jetty 服务器的启动参数,如端口号、主机名、context root 等。 2. JVM 配置:在 Eclipse EE 中,可以在 JVM 视图中配置 JVM 的参数,如heap size、...

    jetty-io-9.4.43.v20210629-API文档-中英对照版.zip

    赠送jar包:jetty-io-9.4.43.v20210629.jar; 赠送原API文档:jetty-io-9.4.43.v20210629-javadoc.jar;...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。 双语对照,边学技术、边学英语。

    jetty 9.2.24

    jetty服务器,9.2版本适合java7+开发环境。... 尽管网页服务器通常用来为人们呈现文档,但是Jetty通常在较大的软件框架中用于计算机与计算机之间的通信。 Jetty作为Eclipse基金会的一部分,是一个自由和开源项目。

    Jetty配置支持https

    Jetty配置支持HTTPS以及受信网站证书生成方式

    jetty各个版本下载

    jetty各个版本下载

    jetty-client-9.4.43.v20210629-API文档-中文版.zip

    赠送jar包:jetty-client-9.4.43.v20210629.jar; 赠送原API文档:jetty-client-9.4.43.v20210629-javadoc.jar; 赠送源代码:jetty-...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。

    jetty-webapp-9.3.19.v20170502-API文档-中文版.zip

    赠送jar包:jetty-webapp-9.3.19.v20170502.jar; 赠送原API文档:jetty-webapp-9.3.19.v20170502-javadoc.jar; 赠送源代码:jetty-...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。

Global site tag (gtag.js) - Google Analytics