`

(转)使用java.util.concurrent实现的线程池、消息队列功能

    博客分类:
  • java
阅读更多
(转)使用java.util.concurrent实现的线程池、消息队列功能

昨天开始研究java.util.concurrent,是出于线程安全的知识懂得不多,对自己写的线程池没有信心,所以就用了包里专家写好的线程池。这个包的功能很强大。有兴趣的朋友可以搜索了解更多的内容。
     今天刚写好了一段200行左右的代码,拿出来跟大家分享我的学习经验。初次实践,不足之处,望能得到高手的指点。
功能说明:一个发送消息模块将消息发送到消息队列中,无需等待返回结果,发送模块继续执行其他任务。消息队列中的指令由线程池中的线程来处理。使用一个Queue来存放线程池溢出时的任务。
TestDriver.java是一个驱动测试,sendMsg方法不间断的向ThreadPoolManager发送数据。
01
public class TestDriver
02
{
03
    ThreadPoolManager tpm = ThreadPoolManager.newInstance();
04

05
    public void sendMsg( String msg )
06
    {
07
        tpm.addLogMsg( msg + "记录一条日志 " );
08
    }
09

10
    public static void main( String[] args )
11
    {
12
        for( int i = 0; i < 100; i++ )
13
        {
14
            new TestDriver().sendMsg( Integer.toString( i ) );
15
        }
16
    }
17
}


ThreadPoolManager类:是负责管理线程池的类。同时维护一个Queue和调度进程。

01
public class ThreadPoolManager
02
{
03
private static ThreadPoolManager tpm = new ThreadPoolManager();
04

05
// 线程池维护线程的最少数量
06
private final static int CORE_POOL_SIZE = 4;
07

08
// 线程池维护线程的最大数量
09
private final static int MAX_POOL_SIZE = 10;
10

11
// 线程池维护线程所允许的空闲时间
12
private final static int KEEP_ALIVE_TIME = 0;
13

14
// 线程池所使用的缓冲队列大小
15
private final static int WORK_QUEUE_SIZE = 10;
16

17
// 消息缓冲队列
18
Queue<String> msgQueue = new LinkedList<String>();
19

20
// 访问消息缓存的调度线程
21
final Runnable accessBufferThread = new Runnable()
22
{
23
  public void run()
24
  {
25
   // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
26
   if( hasMoreAcquire() )
27
   {
28
    String msg = ( String ) msgQueue.poll();
29
    Runnable task = new AccessDBThread( msg );
30
    threadPool.execute( task );
31
   }
32
  }
33
};
34

35
final RejectedExecutionHandler handler = new RejectedExecutionHandler()
36
{
37
  public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
38
  {
39
   System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
40
   msgQueue.offer((( AccessDBThread ) r ).getMsg() );
41
  }
42
};
43

44
// 管理数据库访问的线程池
45
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
46
   CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
47
   new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler );
48

49
// 调度线程池
50
final ScheduledExecutorService scheduler = Executors
51
   .newScheduledThreadPool( 1 );
52

53
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(
54
   accessBufferThread, 0, 1, TimeUnit.SECONDS );
55

56
public static ThreadPoolManager newInstance()
57
{
58
  return tpm;
59
}
60

61
private ThreadPoolManager(){}
62

63
private boolean hasMoreAcquire()
64
{
65
  return !msgQueue.isEmpty();
66
}
67

68
public void addLogMsg( String msg )
69
{
70
  Runnable task = new AccessDBThread( msg );
71
  threadPool.execute( task );
72
}
73
}


AccessDBThread类:线程池中工作的线程。

01
public class AccessDBThread implements Runnable
02
{
03
private String msg;
04
 
05
public String getMsg()
06
{
07
  return msg;
08
}
09

10
public void setMsg( String msg )
11
{
12
  this.msg = msg;
13
}
14
 
15
public AccessDBThread(){
16
  super();
17
}
18
 
19
public AccessDBThread(String msg){
20
  this.msg = msg;
21
}
22

23
public void run()
24
{
25
  // 向数据库中添加Msg变量值
26
  System.out.println("Added the message: "+msg+" into the Database");
27
}
28

29
}
分享到:
评论

相关推荐

    java.util.concurrent 实现线程池队列

    java.util.concurrent 使用jdk内置的包实现一个线程池的操作

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    浅谈java.util.concurrent包中的线程池和消息队列

    主要介绍了浅谈java.util.concurrent包中的线程池和消息队列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...

    Java网络编程精解之ServerSocket用法详解

    在客户/ 服务器通信模式中, 服务器端需要创建监听特定端口的 ServerSocket , ServerSocket 负责接收客户连接请求。...章还介绍了 java.util.concurrent 包中的线程池类的用法,在服务器程序中可以直接使用它们。

    java并发工具包详解

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    java并发包资源

    本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...

    ServerSocket用法详解

    ServerSocket用法详解 提供线程池的一种实现方式。线程池包括一个工作队列和若干工作线程。服务器程 序向工作队列中加入与客户...介绍了java.util.concurrent包中的线程池类的用法,在服务器程序中可以直接使用它们。

    Executor,Executors,ExecutorService比较.docx

    Executors: 是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池。Executors一共可以创建下面这四类线程池: 1.newFixedThreadPool创建一个可缓存线程池,如果线程池长度超过...

    java核心知识点整理.pdf

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

    JAVA核心知识点整理(有效)

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

    customcode:至

    具有java.util.concurrent。*构造的自定义实现 CountDownLatch 循环屏障 换货商 重入锁 读锁 信号 线程池 BlockingQueue-生产者消费者 具有一些集合类的自定义实现 有以下示例代码 易挥发的 僵局 java.util....

    并发容器和线程池,java并发编程3

    JDK提供的这些容器⼤部分在java.util.concurrent包中。我们挑选出⼀些⽐较有代表性的并发容器 1 类,来感受⼀下JDK⾃带的并发集合带来的“快感”。 ConcurrentLinkedQueue是⼀个基于链接节点的⽆界线程安全队列,它...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    【多线程】简述synchronized 和java.util.concurrent.locks.Lock的异同? 90 【线程】ThreadLocal的作用 90 【Spring】什么是IOC和DI?DI是如何实现的 91 【Spring】spring中的IOC(控制反转)的原理 92 【Spring】...

    Java并发编程实战

    14.6 java.util.concurrent同步器类中的 AQS257 14.6.1 ReentrantLock257 14.6.2 Semaphore与CountDownLatch258 14.6.3 FutureTask259 14.6.4 ReentrantReadWriteLock259 第15章 原子变量与非阻塞同步机制261 ...

Global site tag (gtag.js) - Google Analytics