今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。
netty服务端启动代码如下
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
ChannelPipeline pipleline = pipeline();
//默认最大传输帧大小为16M
pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
pipleline.addLast("handler", handler);
return pipleline;
}
});
//设置缓冲区为64M
bootstrap.setOption("receiveBufferSize", 1048576 * 64);
bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
//tcp定期发送心跳包 比如IM里边定期探测对方是否下线
//只有tcp长连接下才有意义
// bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(port));
服务端事件处理顺序如下:
UpStream.ChannelState.OPEN—–>DownStream.ChannelState.BOUND(需要绑定)
——–>UpStream.ChannelState.BOUND(已经绑定)——>DownStream.CONNECTED(需要连接)——->UpStream.CONNECTED(连接成功)
在bind的时候做了如下处理
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelHandler parentHandler = getParentHandler();
这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。
接着
ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
Channel channel = getFactory().newChannel(bossPipeline);
这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用
fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码
@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);
}
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress)
assert finished;
}
bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。
看下ServerSocketChannel的处理
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
}
}
主要是处理bind事件,
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
//取出一个boss线程,然后交给Boss类去处理。
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
看下Boss类,它实现了Runnable接口
private final Selector selector;
private final NioServerSocketChannel channel;
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
channel.selector = selector;
代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。
再看下Boss类的run方法
public void run() {
final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
try {
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (Throwable e) {
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker(); //获取一个NioWorker
//将Channel注册到NioWorker上去
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);
try {
acceptedSocket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。
分享到:
相关推荐
Netty 框架学习 —— 第一个 Netty 应用(csdn)————程序
通过netty编写文件传输的客户端与服务端,以及协议说明, 通用的netty传输协议 通过该协议进行文件传输 文件传输客户端与服务端 可以根据文件的最后更新时间来增量传输文件 源码开放,通过eclipse或者idea导入代码...
Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码
netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》源码 ...
详细的netty框架的简单案例,包括客户端服务端
实现Java服务端和C#客户端联通 Java使用Netty 开发环境为IDEA C#使用DotNetty 开发环境为VS2017 运行时先开启Java服务端 再开启客户端
基于netty框架编写的socket服务器
springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目
客户端发送16进制给服务端,并行实现socket通道活动状态和断开重新连接的功能, 监听接口是否存在数据,如果存在socket客户端发送给socket服务端的实现 随着物联网的发展,随之出现了各种传感器监测数据的实时发送,...
NULL 博文链接:https://jonenine.iteye.com/blog/2170673
Android Studio 开发Netty网络访问框架,实现了客户端、服务端两种访问方式,支持发送心跳数据,使用Handler实现外部数据交互,有调用Demo,在实际项目中使用暂时没有问题
基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏...
然后你就可以仔细JavaFX代码和Netty的代码,很简单的呢。声明一下,本人使用Mina开发游戏服务器,没有打算使用Netty开发应用。制作这个示例只是为给别人帮个忙而已,然后就可以带你入门,最后你自己飞^_^
Netty在IDEA中搭建HelloWorld服务端并对Netty执行流程与重要组件进行介绍示例代码;Netty在IDEA中搭建HelloWorld服务端并对Netty执行流程与重要组件进行介绍示例代码
netty服务器解析16进制数据
java应用netty服务端和客户端示例,客户端和服务端的model对象目录必须一致
Netty实现简单的客户端服务端通信示例,户端发送请求给服务端,并由服务端响应客户端请求,希望对初学Netty的同学有所帮助。
注:下载前请查看本人博客文章,看是否...里面包含模拟TCP客户端发送报文工具,硬件厂商提供的协议,服务端(springboot+netty)解析报文源码,源码里整合了redis,不需要可自行删除,如有需要客户端代码,可联系我。
解析1078部标终端推流,并转发至rtmp流服务器.代码有点乱但是本人保证可用. 1078分了2014和2016.该版本是否都支持我已近刚忘记了.但是大同小异对不对~~就是解析的时候注意下就行了
netty是一个异步非阻塞高并发通信框架,基于聚合多路复用技术,编解码,google的protobuf序列化等技术的高性能通信框架。