`

底层架构-远程通讯-Mina

 
阅读更多

转:http://www.cnblogs.com/java-zone/archive/2012/04/08/2404164.html

 

一:Mina概要   
 Apache Mina是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API。
如下的特性:
1、  基于Java nio的TCP/IP和UDP/IP实现
基于RXTX的串口通信(RS232)
VM 通道通信
2、通过filter接口实现扩展,类似于Servlet filters
3、low-level(底层)和high-level(高级封装)的api:
       low-level:使用ByteBuffers
       High-level:使用自定义的消息对象和解码器
4、Highly customizable(易用的)线程模式(MINA2.0 已经禁用线程模型了):
       单线程
       线程池
       多个线程池
5、基于java5 SSLEngine的SSL、TLS、StartTLS支持
6、负载平衡
7、使用mock进行单元测试
8、jmx整合
9、基于StreamIoHandler的流式I/O支持
10、IOC容器的整合:Spring、PicoContainer
11、平滑迁移到Netty平台
二:实践    
首先讲一下客户端的通信过程:    
客户端通信过程
1.通过SocketConnector同服务器端建立连接
2.链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的
3.通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议
4.最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程
5.写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道
IoFilterChain作为消息过滤链
1.读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程
2.写入的时候一般是从业务对象到字节byte的过程
IoSession贯穿整个通信过程的始终   
1.创建服务器    
 package com.gewara.web.module.base;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.charset.Charset;
import org.apache.mina.core.service.IoAcceptor;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.textline.TextLineCodecFactory;

import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.transport.socket.nio.NioSocketAcceptor;


/** * Mina服务器 * * @author mike * * @since 2012-3-15 */

public class HelloServer

{
        private static final int PORT = 8901;

         // 定义监听端口        

        public static void main(String[] args) throws IOException

       {            

          // 创建服务端监控线程           

            IoAcceptor acceptor = new NioSocketAcceptor();                        

         // 设置日志记录器           

           acceptor.getFilterChain().addLast("logger", new LoggingFilter());                        

        // 设置编码过滤器           

           acceptor.getFilterChain().addLast(

                "codec",

                          new ProtocolCodecFilter(

                                  new TextLineCodecFactory(Charset.forName("UTF-8"))

                       )

             );                       

         // 指定业务逻辑处理器           

           acceptor.setHandler(new HelloServerHandler());                        

         // 设置端口号           

           acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));                        

         // 启动监听线程           

           acceptor.bind();        

       }

}

2.创建服务器端业务逻辑    

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->package com.gewara.web.module.base;
import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IoSession;
/** * 服务器端业务逻辑 * * @author mike * * @since 2012-3-15 */

public class HelloServerHandler extends IoHandlerAdapter

{         

     @Override        

     /**          * 连接创建事件          */        

      public void sessionCreated(IoSession session)

      {            

      // 显示客户端的ip和端口           

                System.out.println(session.getRemoteAddress().toString());        

      }
       

     @Override        

      /**          * 消息接收事件          */        

      public void messageReceived(IoSession session, Object message) throws Exception

     {            

              String str = message.toString();            

              if (str.trim().equalsIgnoreCase("quit"))

              {                

                     // 结束会话               

                     session.close(true);                

                     return;            

              }                        

              // 返回消息字符串           

              session.write("Hi Client!");            

              // 打印客户端传来的消息内容           

             System.out.println("Message written" + str);        

     }

}

3.创建客户端     
package com.gewara.web.module.base;

import java.net.InetSocketAddress;

import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.textline.TextLineCodecFactory;

import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.transport.socket.nio.NioSocketConnector;


/** * Mina客户端 * * @author mike * * @since 2012-3-15 */

public class HelloClient

{       

            public static void main(String[] args)

            {            

                  // 创建客户端连接器.           

                   NioSocketConnector connector = new NioSocketConnector();                        

                 // 设置日志记录器           

                   connector.getFilterChain().addLast("logger", new LoggingFilter());        

                 // 设置编码过滤器           

                   connector.getFilterChain().addLast(

                          "codec",                    

                                new ProtocolCodecFilter(

                                       new TextLineCodecFactory(Charset.forName("UTF-8")

                                     )

                       )

                 );                        

                 

                // 设置连接超时检查时间           

                  connector.setConnectTimeoutCheckInterval(30);                        

                // 设置事件处理器           

                  connector.setHandler(new HelloClientHandler());                        

                // 建立连接           

                  ConnectFuture cf = connector.connect(

                                     new InetSocketAddress("192.168.2.89", 8901)

                                );                        

                // 等待连接创建完成           

                  cf.awaitUninterruptibly();                        

                // 发送消息             

                  cf.getSession().write("Hi Server!");                       

                // 发送消息           

                  cf.getSession().write("quit");                        

                // 等待连接断开           

                  cf.getSession().getCloseFuture().awaitUninterruptibly();


              // 释放连接           

                  connector.dispose();        

    }

}

4.客户端业务逻辑

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->package com.gewara.web.module.base;
import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IoSession;
public class HelloClientHandler extends IoHandlerAdapter

{        

  @Override        

    /**          * 消息接收事件          */        

    public void messageReceived(IoSession session, Object message) throws Exception

    {            

    //显示接收到的消息           

     System.out.println("server message:"+message.toString());        

    }

}

5.先启动服务器端,然后启动客户端

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->2012-03-15 14:45:41,456 

INFO  logging.LoggingFilter - CREATED /192.168.2.89:2691 2012-03-15 14:45:41,456 

INFO  logging.LoggingFilter - OPENED 2012-03-15 14:45:41,487 

INFO  logging.LoggingFilter -

RECEIVED: HeapBuffer[pos=0 lim=11 cap=2048: 48 69 20 53 65 72 76 65 72 21 0A] 2012-03-15 14:45:41,487 

DEBUG codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1 Message writenHi Server! 2012-03-15 14:45:41,487 

INFO  logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty] 2012-03-15 14:45:41,487 

INFO  logging.LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=5 cap=2048: 71 75 69 74 0A] 2012-03-15 14:45:41,487 

DEBUG codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1 2012-03-15 14:45:41,487  INFO  logging.LoggingFilter - CLOSED

三:分析源码
1.首先看服务器

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->// 创建服务端监控线程           

IoAcceptor acceptor = new NioSocketAcceptor();                        

// 设置日志记录器           

acceptor.getFilterChain().addLast("logger", new LoggingFilter());                        

// 设置编码过滤器           

acceptor.getFilterChain().addLast(

     "codec",

           new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))

      )

);

 

// 指定业务逻辑处理器           

acceptor.setHandler(new HelloServerHandler());                        

// 设置端口号           

acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));                        

// 启动监听线程           

acceptor.bind();

1)先创建NioSocketAcceptor nio的接收器,谈到Socket就要说到Reactor模式    
 当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
1. Read request
2. Decode request
3. Process service                                    
4. Encode reply
5. Send reply

但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。 Reactor模式类似于AWT中的Event处理。

Reactor模式参与者

1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread 2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener

 

人有两条路要走, 一条是必须走的,一条是想走的,你必须把必须走的路走漂亮,才可以走想走的路。
分享到:
评论

相关推荐

    分布式服务架构之java远程调用技术浅析

    在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,这些名词之间到底是些什么关系呢,它们背后...

    Hprose 全名是高性能远程对象服务引擎.rar

    目前,主流的平台中都支持各种远程调用技术,以满足分布式系统架构中不同的系统之间的远程通信和相互调用。远程调用的应用场景极其广泛,实现的方式也各式各样。 2. 从通信协议的层面 基于 HTTP 协议的(例如基于...

    JAVA上百实例源码以及开源项目源代码

     当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,启动该通讯线程,通讯完毕,关闭Scoket。  QQ客户端登录界面,中部有三个JPanel,有一个叫选项卡窗口管理。还可以更新...

    java开源包1

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包10

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    JAVA上百实例源码以及开源项目

     当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,启动该通讯线程,通讯完毕,关闭Scoket。  QQ客户端登录界面,中部有三个JPanel,有一个叫选项卡窗口管理。还可以更新...

    java开源包11

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包2

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包3

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包6

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包5

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包4

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包8

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包7

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包9

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包101

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    Java资源包01

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

Global site tag (gtag.js) - Google Analytics