`
ftj20003
  • 浏览: 130482 次
  • 性别: Icon_minigender_1
  • 来自: ...
社区版块
存档分类
最新评论

Mina的线程池实现分析(1)

    博客分类:
  • Java
阅读更多
    线程池是并发应用中,为了减少每个任务调用的开销增强性能而经常使用的技术。在mina中大量的使用这一技术,除了Executors的工厂方法构建线程池之外,它还继承自ThreadPoolExecutor提供自己的线程池的实现OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor。这两者主要应用于ExecutorFilter过滤器。这个过滤器是mina内部实现的众多过滤器之一,其主要作用是把I/O events提交给线程池同时处理同一个IOSession的事件,其默认的线程池的构造是前者。这两个线程池的区别就在于同时处理I/O事件时,前者能够保证同一个Session的事件的处理顺序,而后者则不能保证,所以有可能出现sessionClosed事件在messageReceived事件之前被处理。下面试着从代码解密其怎么保证事件处理顺序的。

    分析Mina的源码最大的感受就是其多线程应用的精细,每次从源码解决自己的疑问都有一种难以言喻的喜悦感。如果一般的框架的源码主要看设计结构的话,Mina的源码的精妙更在于具体的实现,虽然乍一看是复杂又混乱,呵呵。先看看部分源码吧:
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
    ...
    private static final IoSession EXIT_SIGNAL = new DummySession();

    private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
    
    private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();

    private final Set<Worker> workers = new HashSet<Worker>();

    private final AtomicInteger idleWorkers = new AtomicInteger();

    private long completedTaskCount;
    private volatile boolean shutdown;
    ...
    private SessionTasksQueue getSessionTasksQueue(IoSession session) {
        SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);

        if (queue == null) {
            queue = new SessionTasksQueue();
            SessionTasksQueue oldQueue = 
                (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
            
            if (oldQueue != null) {
                queue = oldQueue;
            }
        }
        
        return queue;
    }

    private void addWorker() {
        synchronized (workers) {
            if (workers.size() >= super.getMaximumPoolSize()) {
                return;
            }

            // Create a new worker, and add it to the thread pool
            Worker worker = new Worker();
            Thread thread = getThreadFactory().newThread(worker);
            
            // As we have added a new thread, it's considered as idle.
            idleWorkers.incrementAndGet();
            
            // Now, we can start it.
            thread.start();
            workers.add(worker);

            if (workers.size() > largestPoolSize) {
                largestPoolSize = workers.size();
            }
        }
    }

    private class SessionTasksQueue {
        private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
        
        private boolean processingCompleted = true;
    }
    ...
}

    首先看到的是OrderedThreadPoolExecutor的部分实现,其除了含有ThreadPoolExecutor的一些静态常量之外,这里列出了自身特有的几个变量。EXIT_SIGNAL是代表空的IOSession,如果此线程池得到的全部都是EXIT_SIGNAL,那么处理也就结束了。waitingSessions是存储可用I/O会话的队列,正是这个队列在后面多线程处理会话的I/O事件起到了有序的作用,这个数据机构在UnorderedThreadPoolExecutor内是没有的,后面可以看到其对待事件的粒度要比前者大。workers是一个Worker的集合,每一个Worker都实现了Runnable接口,线程池管理这些Worker线程执行并发的事件处理。很形象的命名,这些Worker说白了就是线程池内的打工仔。idleWorkers则负责及时的统计空闲的工人以便进一步的剥削,由于仅仅是数字在多线程下的增减所以使用atomic包的实现无疑是上佳的选择。最后就是SessionTasksQueue这个内部类了,这个内部的数据结构其实就是一个队列,负责每一个IOSession所对应的I/O事件的处理。

    这样通过waitingSessions区分IOSession,通过SessionTasksQueue区分每个IOSession的I/O事件这个两层结构就可以为有序处理提供了数据结构的保证。对比UnorderedThreadPoolExecutor,其仅仅提供I/O事件的存取队列LinkedBlockingQueue的实例,而不对IOSession进行区分。从上述的源码可以看到getSessionTasksQueue()方法会试图取特定IOSession相关联的事件队列,如果没有则为IOSession添加事件队列的属性。addWorker()则是启动新的Worker线程加入线程池的管理,这些操作不管是有序还是无序的线程池实现都基本一致。最能反映两者差别的就是线程池的execute()方法和其内部的Worker的run()的实现,通过使用不同的数据结构进行I/O事件的存取处理也就体现了两者的差别。先来看看有序的实现:
    public void execute(Runnable task) {
        ...
        IoEvent event = (IoEvent) task;
        
        IoSession session = event.getSession();
        
        SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
        
        boolean offerSession;

        boolean offerEvent = eventQueueHandler.accept(this, event);
        
        if (offerEvent) {
            synchronized (tasksQueue) {
                //*********************************1**********************************
                tasksQueue.offer(event);
                
                if (sessionTasksQueue.processingCompleted) {
                    sessionTasksQueue.processingCompleted = false;
                    offerSession = true;
                } else {
                    offerSession = false;
                }
                ...
                //*********************************end********************************
            }
        } else {
            offerSession = false;
        }

        if (offerSession) {
            waitingSessions.offer(session);
        }

        addWorkerIfNecessary();
        ...
    }

    这个方法可以简单的理解为提交I/O事件给线程池处理,也就是I/O事件的存储。前面说到有序的线程池的实现是采用了两层结构,所以代码很清晰,首先是找I/O事件对应的IOSession,然后找IOSession的事件队列属性(没有就创建)把事件添加到队列里面去。由于要保证代码段1的操作的原子性,所以使用了synchronized的锁机制。offerSession在这里的作用就是保证waitingSessions内的非EXIT_SIGNAL的IOSession是唯一的。addWorkerIfNecessary()则是在没有空闲工人处理事件的情况下添加新的人手(线程),最终还是调用addWorker()。再来看看无序线程池的实现:
    public void execute(Runnable task) {
        ...
        IoEvent e = (IoEvent) task;
        boolean offeredEvent = queueHandler.accept(this, e);
        if (offeredEvent) {
            getQueue().offer(e);
        }

        addWorkerIfNecessary();
        ...
    }

    这个就简单明了了,getQueue()获得是类的构造器提供的LinkedBlockingQueue的实例。offer()方法我之前分析过,属于非阻塞的入队实现,所以不管队列满不满都不会阻塞当前的线程。所以这个I/O事件的存储实际上不牵涉到IOSession的划分:同一个IOSession的多个I/O事件存于同一个队列里,不同的IOSession的I/O事件也都在这个队列里;而有序线程池的实现则是:同一个IOSession的多个I/O事件只存于同一个队列里,每一个IOSession都对应有自己的事件队列
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    Mina的线程池实现分析

    线程池是并发应用中,为了减少...在mina中大量的使用这一技术,除了Executors的工厂方法构建线程池之外,它还继承自ThreadPoolExecutor提供自己的线程池的实现OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor

    Mina2源码分析

    Mina2源码分析,学习mina不可多得的文档资料

    使用mina框架实现cmpp2.0服务端

    我自己写的使用mina框架实现cmpp2.0服务端,经过一段使用解决了几个bug现在比较稳定。

    MINA长连接框架实现通讯

    mina 通讯 实现server端与基于Android系统的client端通讯

    mina带心跳长链接+socket长链接+线程池

    mina带心跳长链接,可实现服务间通信。socket长连接实现客户端与服务端的通信。对于通信技术学习是非常好的资料。改造后可实现企业应用

    mina 实现简单通讯

    实现了mina 的简单通信通信,内部配置了累积协议编解码器、工具类和客户端与服务端的端口配置。能够实现基本功能,下载完成需要四个基本jar包才能实现功能。我的博客上有相应资源支持下载。

    基于MINA2实现的UDP双向通信源码

    本源码是《NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战》一文的服务端实现(MINA2版),详见:http://www.52im.net/thread-378-1-1.html

    Mina2源码分析.doc

    Mina2源码分析,mina2框架。mina nio框架。mina用于webgame游戏开发很多。

    Mina2源码分析.docx

    java Mina2源码分析

    Mina实现长连接和短连接实例

    MINA入门实例,实现长连接,短连接通讯。

    通信层使用Mina框架实现双机通讯

    通讯层使用Mina实现一服务器多客户端的通信,可以修改成一个群体聊天室。Mina是手游开发常用的nio通讯框架,长连接优先使用Mina。希望对你有所帮助!

    mina实现登录功能

    mina实现简单的登录功能,详细见博客:http://blog.csdn.net/guozeming122/article/details/18605937

    Mina消息发送简单实现

    基于Mina的网络通讯,分为服务端和客户端。 研究selector NIO实现时,发现了这个架构。...Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。

    使用MINA实现长连接

    使用MINA实现长连接

    在Java中运用mina来实现TCP通信

    这是一个有关Mina在Java通信中运用的简单的入门实例,MIna自带一种触发机制,无需再开线程等待收发数据。这个实例中有客户端和服务端,与软件TCPUDPDbg进行文字通话测试。用的时候注意添加Mina包,此实例中用到的是...

    Mina实现RPC的例子

    使用mina实现rpc调用.使用参考http://blog.csdn.net/stevexk/archive/2008/07/23/2697907.aspx

    基于Apache Mina实现的TCP长连接和短连接实例

    基于Apache Mina实现的TCP长连接和短连接实例.doc

    Android MiNa 通讯实现

    MiNA在Android应用中的实现,在网上找了许多例子,很少真正能够完成MiNa在Android中应用的,这是我做的例子实现了简单的文本传输

    spring+mina实现http接口服务端通信客户端

    此demo利用springmvc整合mina,实现客户端主动发送消息到服务端,并且以http接口的方式实现,亲测可用。

    Mina+Socket通信

    通过Mina与Socket实现通信,其包含客户端与服务端的实现代码

Global site tag (gtag.js) - Google Analytics