`
flyPig
  • 浏览: 137044 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Mina框架剖析--静态篇

阅读更多
一、基础框架

IoService:IoService相当于是Mina的Socket层,负责所有SocketIO事件的注册,select,分发等。它位于org.apache.mina.core.service包内,它有两个子接口,表示Server端接收方的IoAcceptor和Client发起方的IoConnector,以及所有的实现类:
NioDatagramAcceptor/NioDatagramConnector:基于UDP的实现
NioSocketAcceptor/NioSocketConnector:基于TCP的实现
VmPipeAcceptor/VmPipeConnector:基于Pipe的实现

服务端初始化的方式一般是:
 
        SocketAcceptor acceptor = new NioSocketAcceptor();
        acceptor.setReuseAddress( true );
        acceptor.setHandler(new EchoProtocolHandler());
        acceptor.bind(new InetSocketAddress(PORT));

初始化NioSocketAcceptor的时候会做主要事情:
1.需要通过具体的实现提供的transportMetaData,来判断其中规定的session配置类(eg:DefaultSocketSessionConfig)是否来自接口SessionConfig.
public NioSocketAcceptor() {
        super(new DefaultSocketSessionConfig(), NioProcessor.class);
        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
    }

2.默认启动了一个SimpleIoProcessorPool来包装NioProcessor.
    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
            Class<? extends IoProcessor<T>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
                true);
    }

而SimpleIoProcessorPool默认是启动CPU个数 +1个 NioProcess,并以数组形式管理。
    private static final int DEFAULT_SIZE = Runtime.getRuntime()
            .availableProcessors() + 1;
    private final IoProcessor<T>[] pool;    
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
        this(processorType, null, DEFAULT_SIZE);
    }
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
            Executor executor, int size) {
        if (processorType == null) {
            throw new NullPointerException("processorType");
        }
        if (size <= 0) {
            throw new IllegalArgumentException("size: " + size
                    + " (expected: positive integer)");
        }

        if (executor == null) {
            this.executor = executor = Executors.newCachedThreadPool();
            this.createdExecutor = true;
        } else {
            this.executor = executor;
            this.createdExecutor = false;
        }

        pool = new IoProcessor[size];

        boolean success = false;
        Constructor<? extends IoProcessor<T>> processorConstructor = null;
        boolean usesExecutorArg = true;
        ....
       }


在这里并没有指定Executor,因此用默认的Executors.newCachedThreadPool().这个ThreadPool管理着NioSocketAcceptor和所有的IoProcessor处理线程.

在这里,NioSocketAcceptor可以理解为一个Server,而NioProcessor就是其中的并行的处理程序。NioProcessor默认个数是CPU个数+1,这正好是属于Processor的selector的个数,它们专门处理OP_READ/OP_WRITE事件。在NioSocketAcceptor里面也有个Selector,它是专门用作处理OP_ACCEPT事件.这个分离设计使OP_ACCEPT不被读写事件所影响。因此,采用默认设置,MINA会启动 CPU个数+2 Selector.

每个NioSocketAcceptor内部有个Acceptor线程对象,NioSocketAcceptor会使用传入的或者生成的Executor来执行这个Acceptor。
每个NioProcessor内部也有个Processor线程对象,NioProcessor会使用传入的或者生成的Executor来执行这个Processor。

3.IoService初始化时,建立与监听器容器IoServiceListenerSupport的双向关联,注册匿名内部实现serviceActivationListener到监听器容器.

客户端的启动方式:
NioSocketConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast( "logger", new LoggingFilter() );
connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
connector.setConnectTimeout(30);
connector.setHandler(new TimeClientHandler());
ConnectFuture cf = connector.connect(
new InetSocketAddress("127.0.0.1", 8833));

其基本过程跟NioSocketAcceptor差不多.

IoFilter
IoFilter也是一个接口,有点像ServletFilter,在事件被 IoHandler处理之前或之后进行一些特定的操作,比如记录日志(LoggingFilter)、压缩数据(CompressionFilter),SSL加密(SSLFilter)黑名单过滤(BlackListFilter),心跳检测(KeepAliveFilter)等等。这些都是MINA自带的。
实际项目中,有个东西肯定是存在的,就是ProtocolCodecFilter或者跟它类似的filter.我们不可能让IoHandler直接去操作字节流,所以在这一层,一定要把二进制流转成java对象或者文本,这样才能不枉费MINA的良好分层设计。

ProtocolCodecFilter
使用ProtocolCodecFilter很简单,我们只要把ProtocolCodecFilter加入到FilterChain就可以了,但是我们需要提供一个ProtocolCodecFactory。其实ProtocolCodecFilter仅仅是实现了过滤器部分的功能,它会将最终的转换工作,交给从ProtocolCodecFactory获得的Encode和Decode。如果我们需要编写自己的ProtocolCodec,就应该从 ProtocolCodecFactory入手。MINA内置了几个ProtocolCodecFactory,比较常用的就是 ObjectSerializationCodecFactory和TextLineCodecFactory。
ObjectSerializationCodecFactory是Java Object序列化之后的内容直接跟ByteBuffer互相转化,比较适合两端都是Java的情况使用。在笔者所做的自定义协议环境下,在这里就需要重写ObjectSerializationCodecFactory。
TextLineCodecFactory就是 String跟ByteBuffer的转化,比如HTTP,RTSP这类纯文本协议。

IoFilter的顺序问题:IoFilter是有加入顺序的,例如,先加入LoggingFilter再加入ProtocolCodecFilter,和先加入ProtocolCodecFilter再加入LoggingFilter的效果是不一样的,前者 LoggingFilter写入日志的内容是ByteBuffer,而后者写入日志的是转换后具体的类,例如String。

ExecutorFilter
有一个比较重要的过滤器就是ExecutorFilter。前面已经知道,每个NioProcess对应的是一个Process内部线程对象,在Process的run方法里面会循环的调用process方法。也就是第一个事件没有执行完,第二个事件不会执行,如果某次消息处理太耗时,就会导致其他消息等待,整体的吞吐量下降。ExecutorFilter的的作用就是将同一个类型的消息合并起来按顺序调用
public final void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) {
        if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter,
                IoEventType.MESSAGE_RECEIVED, session, message); 
            fireEvent(event);
        } else {
            nextFilter.messageReceived(session, message);
        }
    }

从上面的代码可以看到,如果满足事件集合条件,就组装成IoFilterEvent交给Executor异步执行。这样在filter上就不会因为某个事件filter执行时间过长而block后面的事件。
protected void fireEvent(IoFilterEvent event) {
        executor.execute(event);
    }

public void fire() {
        IoSession session = getSession();
        NextFilter nextFilter = getNextFilter();
        IoEventType type = getType();

        if ( LOGGER.isDebugEnabled()) {
            LOGGER.debug( "Firing a {} event for session {}",type, session.getId() );
        }

        switch (type) {
        case MESSAGE_RECEIVED:
            Object parameter = getParameter();
            nextFilter.messageReceived(session, parameter);
            break;
        case MESSAGE_SENT:
            WriteRequest writeRequest = (WriteRequest)getParameter();
            nextFilter.messageSent(session, writeRequest);
            break;
        case WRITE:
            writeRequest = (WriteRequest)getParameter();
            nextFilter.filterWrite(session, writeRequest);
            break;
        case CLOSE:
            nextFilter.filterClose(session);
            break;
        case EXCEPTION_CAUGHT:
            Throwable throwable = (Throwable)getParameter();
            nextFilter.exceptionCaught(session, throwable);
            break;
        case SESSION_IDLE:
            nextFilter.sessionIdle(session, (IdleStatus) getParameter());
            break;
        case SESSION_OPENED:
            nextFilter.sessionOpened(session);
            break;
        case SESSION_CREATED:
            nextFilter.sessionCreated(session);
            break;
        case SESSION_CLOSED:
            nextFilter.sessionClosed(session);
            break;
        default:
            throw new IllegalArgumentException("Unknown event type: " + type);
        }
        
        if ( LOGGER.isDebugEnabled()) {
            LOGGER.debug( "Event {} has been fired for session {}", type, session.getId() );
        }
    }


实际上MINA是用filterChain的方式顺序调用所有注册的filter.默认的DefaultIoFilterChain
if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);

然后就会一个filter接一个的传下去,直到最后的尾调用IoHandler.

IoHandler
IoHanlder则是所有事件最终产生响应的位置,一般用来处理业务比如分析数据,数据库操作等。
void sessionCreated(IoSession session) throws Exception;
    void sessionOpened(IoSession session) throws Exception;
    void sessionClosed(IoSession session) throws Exception;
    void sessionIdle(IoSession session, IdleStatus status) throws Exception;
    void exceptionCaught(IoSession session, Throwable cause) throws Exception;
    void messageReceived(IoSession session, Object message) throws Exception;
    void messageSent(IoSession session, Object message) throws Exception;
messageReceived是接收客户端消息的事件,我们应该在这里实现业务逻辑。
messageSent是服务器发送消息的事件,一般情况下不会使用它。
sessionClosed是客户端断开连接的事件,可以在这里进行一些资源回收等操作。sessionCreated和sessionOpened,两者稍有不同,sessionCreated是由I/O processor线程触发的,而sessionOpened在其后,由业务线程触发的,由于MINA的I/O processor线程非常少,因此如果我们真的需要使用sessionCreated,也必须是耗时短的操作,一般情况下,我们应该把业务初始化的功能放在sessionOpened事件中。

IoSession
IoSession是一个接口,它提供了对当前连接的操作功能,还有用户定义属性的存储功能,就像HttpSession。一个IoSession就代表一个Client与Server的连接。IoSession是线程安全的,第一层子类AbstractIoSession内部用到了大量的lock机制,因此可以放心的使用而不用担心并发问题。常用的方法:
//
WriteFuture write(Object message)
CloseFuture close();

//属性相关操作 
Object getAttribute(String key);
Object setAttribute(String key, Object value);
Object removeAttribute(String key);
Set getAttributeKeys();
//连接状态操作
boolean isConnected();
boolean isClosing();
SocketAddress getRemoteAddress();
boolean isIdle(IdleStatus status);

主要方法可以分为三类:
连接操作
最主要的方法有两个,向客户端发送消息和断开连接。可以看的出,write接受的变量是一个Object,实际传入的类型应该是最后一个filter处理后的结果。最初始的状态下,message是一个IoBuffer.
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);


另外注意的是,write返回的WriteFuture类,这样调用IoSession.write方法是不会阻塞的。调用了write方法之后,消息内容会发到底层等候发送,至于什么时候发出,就看线程的调度处理了。非常典型的Future模式的应用。
如果需要确认消息是否成功的发送出去了,只需要wait一下,然后检查下future的状态。
WriteFuture future = session.write(xx);
future.awaitUninterruptibly();
boolean isCompleted = future.isWritten()

当调用write的时候,会通过跟read相反的次序依次传递,直至由IoService负责把数据发送给客户端.
如果在很短的时间里,对同一个IoSession进行了两次write操作,客户端有可能只收到一条消息,而这条消息就是服务器发出的两条消息前后接起来。这样的设计可以在高并发的时候节省网络开销。

属性存储操作
一般用于状态维护。参考HttpSession的作用。跟Attribute相关的4个方法都是跟这个相关的。

连接状态
最后面的4个方法全是连接属性的查询。没什么好说的。




  • 大小: 29 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics