NioProcessor是mina中的另一个核心部分,与NioSocketAcceptor类似,NioProcessor三个主要功能是:
1、接受一个NioSession
2、出来NioSession上的read、write等事件
3、关闭一个NioSession
与NioSocketAcceptor类似,NioProcessor的实现采用了template模式,以上功能整体流程在NioProcessor的父类AbstractPollingIoProcessor中基本完成了,NioSocketAcceptor只是针对Nio的情况完成实现。
创建NioProcessor
如上图,NioSocketAcceptor创建了SimpleIoProcessorPool,SimpleIoProcessorPool中默认存在cpu数+1个NioProcessor,并且这些NioProcessor的工作者线程共享一个线程池。
接受一个NioSession
与NioSocketAcceptor新增一个端口绑定类似,NioProcessor.addSession只是将NioSocketAcceptor新建的NioSession放入一个消息队列中,由工作者线程负责初始化该NioSession,在selector为该session注册OP_READ事件。
关闭一个NioSession
与接受一个NioSession类似,不再描述
NioSession的数据处理
为NioProcessor继承自AbstractPollingIoProcessor的工作者线程中完成主要功能
1. 在循环中,selector监听所有端口,注意在NioProcessor中的select超时时间为1秒,这意味着最多一秒钟的时候,NioProcessor.Worker线程唤醒一次。而在NioSocketAcceptor.Work.run中select是没有超时时间的。下面Worker线程两次唤醒之间简称为一个周期,易知一个周期的长度小于等于一秒。
public void run() {
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) {
try {
boolean selected = select(1000);
nSessions += add();
updateTrafficMask();
if (selected) {
process();
}
long currentTime = System.currentTimeMillis();
flush(currentTime);
nSessions -= remove();
notifyIdleSessions(currentTime);
if (nSessions == 0) {
synchronized (lock) {
if (newSessions.isEmpty() && isSelectorEmpty()) {
worker = null;
break;
}
}
}
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
if (isDisposing()) {
for (Iterator<T> i = allSessions(); i.hasNext(); ) {
scheduleRemove(i.next());
}
wakeup();
}
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
try {
synchronized (disposalLock) {
if (isDisposing()) {
dispose0();
}
}
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
} finally {
disposalFuture.setValue(true);
}
}
2. 在addSession中从新增session队列newSeesion获取一个新增NioSession,并开始监控之。
private int add() {
int addedSessions = 0;
// Loop on the new sessions blocking queue, to count
// the number of sessions who has been created
for (;;) {
T session = newSessions.poll();
if (session == null) {
// We don't have anymore new sessions
break;
}
if (addNow(session)) {
// The new session has been added to the
addedSessions ++;
}
}
return addedSessions;
}
2.1 在addNow(T session)中完成了单个NioSession的初始化
private boolean addNow(T session) {
boolean registered = false;
boolean notified = false;
try {
init(session);
registered = true;
// Build the filter chain of this session.
session.getService().getFilterChainBuilder().buildFilterChain(
session.getFilterChain());
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
notified = true;
} catch (Throwable e) {
if (notified) {
// Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
scheduleRemove(session);
session.getFilterChain().fireExceptionCaught(e);
wakeup();
} else {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
destroy(session);
} catch (Exception e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
} finally {
registered = false;
}
}
}
return registered;
}
2.1.1 初始化session,这里是Template Method的又一个体现,因为不同的类型的session初始化实现不同, 在NioProcessor中包括:设置非堵塞模式.为该session注册OP_READ事件。
@Override
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
}
2.1.2 构建IoFilterChain,具体请参考
http://uniseraph.iteye.com/blog/228194
2.1.3 触发NioSession上的相关事件,依次为sessionCreated->sessionOpened ->
IoServiceListener的sessionCreated
3 修改sessio的traffic参数
从trafficControllingSessions获取需要修改session的traffic参数,具体与新增NioSession类似,不再详细描述。
4 接受处理socket数据并应答
关键内容来了,如果有NioSession的channel处理于OP_READ状态,则处理之
if (selected) {
process();
}
process方法对于所有发生了OP_READ或OP_WRITE的NioSession依次进行处理,注意虽然在NioSession初始化的时候只注册了OP_READ事件,但是在上一周期调用session.write方法的时候,上一周期的flush方法将会注册OP_WRITE方法。本周期发送的数据都是上一周期确定的。
private void process(T session) {
if (isReadable(session) && session.getTrafficMask().isReadable()) {
read(session);
}
if (isWritable(session) && session.getTrafficMask().isWritable()) {
scheduleFlush(session);
}
}
在read方法中读取socket上的数据,调用发IoFilter,逐层传递到IoHander(具体参考
http://uniseraph.iteye.com/blog/228194);如果读到-1,则增加一个关闭连接消息到队列中;如果发生异常,则异常调用IoFilter和IoHandler的fireExceptionCaught方法
private void read(T session) {
IoSessionConfig config = session.getConfig();
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
final boolean hasFragmentation =
session.getTransportMetadata().hasFragmentation();
try {
int readBytes = 0;
int ret;
try {
if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}
if (readBytes > 0) {
session.getFilterChain().fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
scheduleRemove(session);
}
} catch (Throwable e) {
if (e instanceof IOException) {
scheduleRemove(session);
}
session.getFilterChain().fireExceptionCaught(e);
}
}
发送数据
6 关闭连接及其他
如果没有消息积累,也没有新创建的连接,则关闭线程池
![点击查看原始大小图片](http://dl2.iteye.com/upload/attachment/0003/5382/8c0a72a2-7fc7-349e-84fd-1e3ee638b572-thumb.gif)
- 大小: 9 KB
分享到:
相关推荐
中英文版的 pdf 均带有书签,方便读者朋友查阅。 mina_2.0_user_guide_cn.pdf 内容预览: 第一章:入门 第二章:基础知识 第三章:IO 服务 第四章:会话 第五章:过滤器 第六章:传输 第七章:事件处理器 第八章:...
* NIO 是 Java 语言中的一种 I/O 模式,提供了高性能的 I/O 操作。 * MINA 是基于 NIO 的一个网络应用框架,提供了一个灵活的架构和高性能的 I/O 操作。 Chapter 2 - 基础应用架构 本章节介绍了 MINA 的基础应用...
Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)
Apache mina中文开发手册,socket开源通信框架
MINA 2.0 User Guide Part I - Basics Chapter 1 - Getting Started Chapter 2 - Basics Chapter 3 - Service Chapter 4 - Session Chapter 5 - Filters Chapter 6 - Transports Chapter 7 - Handler Part II - ...
Apache MINA 2.0 用户指南
mina连接,mina心跳连接,mina断线重连。其中客户端可直接用在android上。根据各方参考资料,经过自己的理解弄出来的。CSDN的资源分太难得了。
apache-mina-2.0.7 用户手册中文随笔翻译
mina的使用初步入门mina的使用初步入门mina的使用初步入门
mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...
里面包含mina2.0的api(英文)和mina自学手册,还有mina的开发指导
mina的高级使用,mina文件图片传送,
mina2.0用户指南,mina_2.0_user_guide_cn.pdf
Apache MINA是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利...当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版中提供)。
深入理解Apache_Mina_(1)----_Mina的几个类 深入理解Apache_Mina_(2)----_与IoFilter相关的几个类 深入理解Apache_Mina_(3)----_与IoHandler相关的几个类 深入理解Apache_Mina_(4)----_IoFilter和IoHandler的区别和...
mina内部源码,可以深入的研究下,重构修改后获得的效率更加突出
二、Mina2.0框架的核心组件 Mina2.0框架的核心组件包括: * IoAcceptor:负责监听和处理incoming连接的组件。 * IoSession:代表了一个网络连接的session对象。 * IoHandler:负责处理incoming数据的Handler接口。...
许多刚接触mina的朋友,对于mina的编解码器的编写很迷惑.希望这个文档可以帮助朋友们少走弯路。 资源中是一个比较典型的编解码器写法。生成了可执行文件。并对编解码器的代码有详细注释。
Mina2的中文用户手册,仅供互相学习参考,如有侵权立删
mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...