- 浏览: 259935 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (119)
- spring (1)
- hibernate (1)
- struts (0)
- ibatis (0)
- memcache (4)
- mysql (1)
- ant (0)
- junit (0)
- protobuf (1)
- android (1)
- java (15)
- language (1)
- google (1)
- scala (0)
- ruby (0)
- python (0)
- 设计模式 (1)
- think in java (6)
- 服务器 (4)
- javascript (24)
- css (2)
- mongodb (1)
- eclipse (1)
- 并发 (1)
- test (1)
- jquery (3)
- vim (2)
- javaio (1)
- log4j (0)
- jdk (0)
- api (0)
- hadoop (1)
- HashMap (1)
- 数据库 (1)
- webservice (1)
- jvm (0)
- linux (4)
最新评论
-
weilingfeng98:
定制SSLContext
java安全SSL -
weixuanfeng:
楼主有没有用eclipse,Java调用Ant脚本的代码啊。 ...
ant调用步骤
(转)使用java.util.concurrent实现的线程池、消息队列功能
昨天开始研究java.util.concurrent,是出于线程安全的知识懂得不多,对自己写的线程池没有信心,所以就用了包里专家写好的线程池。这个包的功能很强大。有兴趣的朋友可以搜索了解更多的内容。
今天刚写好了一段200行左右的代码,拿出来跟大家分享我的学习经验。初次实践,不足之处,望能得到高手的指点。
功能说明:一个发送消息模块将消息发送到消息队列中,无需等待返回结果,发送模块继续执行其他任务。消息队列中的指令由线程池中的线程来处理。使用一个Queue来存放线程池溢出时的任务。
TestDriver.java是一个驱动测试,sendMsg方法不间断的向ThreadPoolManager发送数据。
01
public class TestDriver
02
{
03
ThreadPoolManager tpm = ThreadPoolManager.newInstance();
04
05
public void sendMsg( String msg )
06
{
07
tpm.addLogMsg( msg + "记录一条日志 " );
08
}
09
10
public static void main( String[] args )
11
{
12
for( int i = 0; i < 100; i++ )
13
{
14
new TestDriver().sendMsg( Integer.toString( i ) );
15
}
16
}
17
}
ThreadPoolManager类:是负责管理线程池的类。同时维护一个Queue和调度进程。
01
public class ThreadPoolManager
02
{
03
private static ThreadPoolManager tpm = new ThreadPoolManager();
04
05
// 线程池维护线程的最少数量
06
private final static int CORE_POOL_SIZE = 4;
07
08
// 线程池维护线程的最大数量
09
private final static int MAX_POOL_SIZE = 10;
10
11
// 线程池维护线程所允许的空闲时间
12
private final static int KEEP_ALIVE_TIME = 0;
13
14
// 线程池所使用的缓冲队列大小
15
private final static int WORK_QUEUE_SIZE = 10;
16
17
// 消息缓冲队列
18
Queue<String> msgQueue = new LinkedList<String>();
19
20
// 访问消息缓存的调度线程
21
final Runnable accessBufferThread = new Runnable()
22
{
23
public void run()
24
{
25
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
26
if( hasMoreAcquire() )
27
{
28
String msg = ( String ) msgQueue.poll();
29
Runnable task = new AccessDBThread( msg );
30
threadPool.execute( task );
31
}
32
}
33
};
34
35
final RejectedExecutionHandler handler = new RejectedExecutionHandler()
36
{
37
public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
38
{
39
System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
40
msgQueue.offer((( AccessDBThread ) r ).getMsg() );
41
}
42
};
43
44
// 管理数据库访问的线程池
45
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
46
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
47
new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler );
48
49
// 调度线程池
50
final ScheduledExecutorService scheduler = Executors
51
.newScheduledThreadPool( 1 );
52
53
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(
54
accessBufferThread, 0, 1, TimeUnit.SECONDS );
55
56
public static ThreadPoolManager newInstance()
57
{
58
return tpm;
59
}
60
61
private ThreadPoolManager(){}
62
63
private boolean hasMoreAcquire()
64
{
65
return !msgQueue.isEmpty();
66
}
67
68
public void addLogMsg( String msg )
69
{
70
Runnable task = new AccessDBThread( msg );
71
threadPool.execute( task );
72
}
73
}
AccessDBThread类:线程池中工作的线程。
01
public class AccessDBThread implements Runnable
02
{
03
private String msg;
04
05
public String getMsg()
06
{
07
return msg;
08
}
09
10
public void setMsg( String msg )
11
{
12
this.msg = msg;
13
}
14
15
public AccessDBThread(){
16
super();
17
}
18
19
public AccessDBThread(String msg){
20
this.msg = msg;
21
}
22
23
public void run()
24
{
25
// 向数据库中添加Msg变量值
26
System.out.println("Added the message: "+msg+" into the Database");
27
}
28
29
}
昨天开始研究java.util.concurrent,是出于线程安全的知识懂得不多,对自己写的线程池没有信心,所以就用了包里专家写好的线程池。这个包的功能很强大。有兴趣的朋友可以搜索了解更多的内容。
今天刚写好了一段200行左右的代码,拿出来跟大家分享我的学习经验。初次实践,不足之处,望能得到高手的指点。
功能说明:一个发送消息模块将消息发送到消息队列中,无需等待返回结果,发送模块继续执行其他任务。消息队列中的指令由线程池中的线程来处理。使用一个Queue来存放线程池溢出时的任务。
TestDriver.java是一个驱动测试,sendMsg方法不间断的向ThreadPoolManager发送数据。
01
public class TestDriver
02
{
03
ThreadPoolManager tpm = ThreadPoolManager.newInstance();
04
05
public void sendMsg( String msg )
06
{
07
tpm.addLogMsg( msg + "记录一条日志 " );
08
}
09
10
public static void main( String[] args )
11
{
12
for( int i = 0; i < 100; i++ )
13
{
14
new TestDriver().sendMsg( Integer.toString( i ) );
15
}
16
}
17
}
ThreadPoolManager类:是负责管理线程池的类。同时维护一个Queue和调度进程。
01
public class ThreadPoolManager
02
{
03
private static ThreadPoolManager tpm = new ThreadPoolManager();
04
05
// 线程池维护线程的最少数量
06
private final static int CORE_POOL_SIZE = 4;
07
08
// 线程池维护线程的最大数量
09
private final static int MAX_POOL_SIZE = 10;
10
11
// 线程池维护线程所允许的空闲时间
12
private final static int KEEP_ALIVE_TIME = 0;
13
14
// 线程池所使用的缓冲队列大小
15
private final static int WORK_QUEUE_SIZE = 10;
16
17
// 消息缓冲队列
18
Queue<String> msgQueue = new LinkedList<String>();
19
20
// 访问消息缓存的调度线程
21
final Runnable accessBufferThread = new Runnable()
22
{
23
public void run()
24
{
25
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
26
if( hasMoreAcquire() )
27
{
28
String msg = ( String ) msgQueue.poll();
29
Runnable task = new AccessDBThread( msg );
30
threadPool.execute( task );
31
}
32
}
33
};
34
35
final RejectedExecutionHandler handler = new RejectedExecutionHandler()
36
{
37
public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
38
{
39
System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
40
msgQueue.offer((( AccessDBThread ) r ).getMsg() );
41
}
42
};
43
44
// 管理数据库访问的线程池
45
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
46
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
47
new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler );
48
49
// 调度线程池
50
final ScheduledExecutorService scheduler = Executors
51
.newScheduledThreadPool( 1 );
52
53
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(
54
accessBufferThread, 0, 1, TimeUnit.SECONDS );
55
56
public static ThreadPoolManager newInstance()
57
{
58
return tpm;
59
}
60
61
private ThreadPoolManager(){}
62
63
private boolean hasMoreAcquire()
64
{
65
return !msgQueue.isEmpty();
66
}
67
68
public void addLogMsg( String msg )
69
{
70
Runnable task = new AccessDBThread( msg );
71
threadPool.execute( task );
72
}
73
}
AccessDBThread类:线程池中工作的线程。
01
public class AccessDBThread implements Runnable
02
{
03
private String msg;
04
05
public String getMsg()
06
{
07
return msg;
08
}
09
10
public void setMsg( String msg )
11
{
12
this.msg = msg;
13
}
14
15
public AccessDBThread(){
16
super();
17
}
18
19
public AccessDBThread(String msg){
20
this.msg = msg;
21
}
22
23
public void run()
24
{
25
// 向数据库中添加Msg变量值
26
System.out.println("Added the message: "+msg+" into the Database");
27
}
28
29
}
发表评论
-
hadoop安装步骤
2012-04-11 18:07 102731.安装jdk(自带了jre) ... -
javaIo总结
2011-12-11 01:19 808节点流: 1. -
java并发整理
2011-09-14 00:34 1091java并发库中几个同步工具类 CopyOnWriteArra ... -
eclipse快捷键
2011-05-27 12:38 687来一个大众版的eclipse快捷键使用技巧,请看链接http: ... -
URLRewrite配置(转)
2011-05-09 17:18 1345web.xml <!-- URL 伪静态过滤 --&g ... -
简单工厂设计模式
2011-01-05 23:52 870http://www.cnblogs.com/zzj-4600 ... -
代理模式和装饰器模式的区别与联系
2011-01-04 20:04 2438最近上javaeye,看到不少人讨论java设计模式,本人 ... -
【转】线程池(java.util.concurrent.ThreadPoolExecutor)使用简介
2010-12-16 16:03 954在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许 ... -
Format的子类中SimpleDateFormat,NumberFormat,MessageFormat那些是线程安全的
2010-12-15 16:12 2816SimpleDateFormat,NumberFormat,M ... -
servlet线程安全
2010-12-14 20:01 2125JSP/Servlet的多线程原理: ... -
java中String、StringBuilder和StringBuffer,StringHelper 的区别
2010-12-09 18:00 1451String 字符串常量 StringBuffer 字符串变量 ... -
开始学习scala和ruby
2010-10-27 16:15 945开始学习scala和ruby -
缓存策略
2010-09-21 13:26 856http://hi.baidu.com/pepsichan/b ... -
StringTokenizer是一个很好的一个类
2010-09-20 14:58 1066StringTokenizer 是一个很好的类,一直没有怎么用 ...
相关推荐
java.util.concurrent 使用jdk内置的包实现一个线程池的操作
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
主要介绍了浅谈java.util.concurrent包中的线程池和消息队列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...
在客户/ 服务器通信模式中, 服务器端需要创建监听特定端口的 ServerSocket , ServerSocket 负责接收客户连接请求。...章还介绍了 java.util.concurrent 包中的线程池类的用法,在服务器程序中可以直接使用它们。
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...
ServerSocket用法详解 提供线程池的一种实现方式。线程池包括一个工作队列和若干工作线程。服务器程 序向工作队列中加入与客户...介绍了java.util.concurrent包中的线程池类的用法,在服务器程序中可以直接使用它们。
Executors: 是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池。Executors一共可以创建下面这四类线程池: 1.newFixedThreadPool创建一个可缓存线程池,如果线程池长度超过...
25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................
25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................
具有java.util.concurrent。*构造的自定义实现 CountDownLatch 循环屏障 换货商 重入锁 读锁 信号 线程池 BlockingQueue-生产者消费者 具有一些集合类的自定义实现 有以下示例代码 易挥发的 僵局 java.util....
JDK提供的这些容器⼤部分在java.util.concurrent包中。我们挑选出⼀些⽐较有代表性的并发容器 1 类,来感受⼀下JDK⾃带的并发集合带来的“快感”。 ConcurrentLinkedQueue是⼀个基于链接节点的⽆界线程安全队列,它...
【多线程】简述synchronized 和java.util.concurrent.locks.Lock的异同? 90 【线程】ThreadLocal的作用 90 【Spring】什么是IOC和DI?DI是如何实现的 91 【Spring】spring中的IOC(控制反转)的原理 92 【Spring】...
14.6 java.util.concurrent同步器类中的 AQS257 14.6.1 ReentrantLock257 14.6.2 Semaphore与CountDownLatch258 14.6.3 FutureTask259 14.6.4 ReentrantReadWriteLock259 第15章 原子变量与非阻塞同步机制261 ...