`
yufeng0471
  • 浏览: 98592 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

线程池饱和策略之阻塞式处理

    博客分类:
  • java
 
阅读更多

背景:

 

A系统向activemq发送消息,B系统以监听的方式从activemq接收消息,因为这些消息都是转换文件,是CPU消耗型的服务,而服务器都是多CPU,为了充分利用CPU资源,B系统以多线程方式处理消息,这里用到了线程池,假设线程池最大线程数量是8(和CPU数量相等),但是在接收消息的时候,发现B系统把所有的消息都接收下来,放在了线程池的队列中,这样就产生问题了,如果B系统down掉的话,所有的消息都会丢失。

 

其实我想要的效果是,如果线程池中工作队列里的任务数量大于一定值的时候,B系统的消息接收监听器就阻塞,不要再从activemq中接收消息,通过查看线程池ThreadPoolExecutor源码

 

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

 发现工作队列BlockingQueue在插入任务的时候,执行的是workQueue.offer(),该函数并不会阻塞插入任务的操作,其实应该用workQueue.put()函数,这个函数会产生等待,直到工作队列BlockingQueue中的任务数量下降。

 

解决方案:

 

继承ArrayBlockingQueue,重写offer函数,代码如下

package cn.sh.ideal.pool;

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueReplaceOffer<E>  extends ArrayBlockingQueue<E> {

    public ArrayBlockingQueueReplaceOffer(int capacity) {
        super(capacity);
    }

    @Override
    public boolean offer(E e){
        try {
            super.put(e);
            return true;
        } catch (InterruptedException e1) {
            e1.printStackTrace();
            return false;
        }
    }
}
 

在初始化线程池的时候,使用ArrayBlockingQueueReplaceOffer,这样就达到了阻塞的目的

new ThreadPoolExecutor(8, 8,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueueReplaceOffer <Runnable>(8))

 

已经有人把阻塞式的处理请求提交到了 Java Bug 数据库(Bug Id 6648211,“ThreadPoolExecutor 特性需求”)

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics