- 浏览: 954701 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 网络通信示例一 :http://donald-draper.iteye.com/blog/2383326
netty 网络通信示例二:http://donald-draper.iteye.com/blog/2383328
上一篇文章我们通过一个示例,来展示netty如何处理粘包问题,其中涉及到解码器,今天我们在通过一个实例,来看一个用到编码器与解码器的示例,这个示例作用为服务器提供客户端计算请求,并将结果返回给客户端。
通信协议:
这个协议我们在将Java socket编程的时候有实现过,在mina相关示例中,我们也有说过,不过协议有所不同;
这里我们用netty来实现。
协议常量:
计算请求协议编码器:
计算请求协议解码器:
计算结果协议编码器:
计算结果协议解码器:
服务端:
服务端通道处理器Initializer:
服务端处理器:
客户端:
客户端通道处理器Initializer:
客户端handler:
启动服务端与客户端,控制台输出:
服务端:
[INFO ] 2017-07-06 09:03:15 netty.main.math.MathServer =========Server is start=========
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageDecoder =======解码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300100","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageEncoder =======编码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":136}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageDecoder =======解码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300000","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageEncoder =======编码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":25}
客户端:
[INFO ] 2017-07-06 09:03:23 netty.main.math.MathClient =========Client is start=========
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageEncoder =======编码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300100","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageEncoder =======编码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300000","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageDecoder =======解码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":136}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageDecoder =======解码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":25}
[INFO ] 2017-07-06 09:03:24 netty.main.math.MathClient =======Calculat result:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":25}
netty 网络通信示例二:http://donald-draper.iteye.com/blog/2383328
上一篇文章我们通过一个示例,来展示netty如何处理粘包问题,其中涉及到解码器,今天我们在通过一个实例,来看一个用到编码器与解码器的示例,这个示例作用为服务器提供客户端计算请求,并将结果返回给客户端。
通信协议:
这个协议我们在将Java socket编程的时候有实现过,在mina相关示例中,我们也有说过,不过协议有所不同;
这里我们用netty来实现。
协议常量:
package netty.constant.math; /** * 协议常量 * @author donald * 2017年6月22日 * 下午1:10:11 */ public class ProtocolConstants { /** * 加法协议编码 */ public static final String SUM_PROTOCOL_300000 = "300000"; /** * 乘法协议编码 */ public static final String MULTI_PROTOCOL_300100 = "300100"; /** * 计算成功协议 */ public static final String ACK_PROTOCOL_300200 = "300200"; /** * 服务器解析协议失败 */ public static final String ACK_PROTOCOL_300300 = "300300"; /** * 协议编码长度 */ public static final Integer PROTOCOL_CODE_LENGTH = 6; /** * 协议内容长度字段 */ public static final Integer PROTOCOL_DATA_LENGTH = 4; /** * 协议操作数长度 */ public static final Integer OPERATE_NUM_LENGTH = 4; /** * 协议计算结果长度 */ public static final Integer PROTOCOL_ACK_LENGTH = 4; /** * 协议结束符 */ public static final String PROTOCOL_END = "\r\n"; /** * 协议结束符长度 */ public static final Integer PROTOCOL_END_LENGTH = 2; /** * 字符集 */ public static final String CHARSET_UTF8 = "UTF-8"; }
计算请求协议编码器:
package netty.codec.math; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import netty.constant.math.ProtocolConstants; import netty.message.MathMessage; import util.JsonUtil; import java.io.UnsupportedEncodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 计算协议编码器 * @author donald * 2017年6月22日 * 下午10:23:21 */ public class MathMessageEncoder extends MessageToByteEncoder<MathMessage> { private static final Logger log = LoggerFactory.getLogger(MathMessageEncoder.class); @Override protected void encode(ChannelHandlerContext ctx, MathMessage msg, ByteBuf out) { try { out.writeBytes(msg.getProtocolCode(). getBytes(ProtocolConstants.CHARSET_UTF8)); out.writeInt(msg.getDataLenth()); out.writeInt(msg.getFirstNum()); out.writeInt(msg.getSecondNum()); out.writeBytes(msg.getEndMark(). getBytes(ProtocolConstants.CHARSET_UTF8)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } log.info("=======编码计算请求协议成功:"+JsonUtil.toJson(msg)); } }
计算请求协议解码器:
package netty.codec.math; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; import netty.constant.math.ProtocolConstants; import netty.message.MathMessage; import util.JsonUtil; import java.io.UnsupportedEncodingException; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 计算协议解码器 * @author donald * 2017年6月22日 * 下午10:47:31 */ public class MathMessageDecoder extends ByteToMessageDecoder { private static final Logger log = LoggerFactory.getLogger(MathMessageDecoder.class); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // Wait until the length prefix is available. int protocolLenth = ProtocolConstants.PROTOCOL_CODE_LENGTH+ ProtocolConstants.PROTOCOL_DATA_LENGTH; if (in.readableBytes() < protocolLenth) { return; } in.markReaderIndex(); byte[] protocolCodeBytes = new byte[ProtocolConstants.PROTOCOL_CODE_LENGTH]; in.readBytes(protocolCodeBytes); String protocolCode = ""; try { protocolCode = new String(protocolCodeBytes,ProtocolConstants.CHARSET_UTF8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } if (!protocolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000) && !protocolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100) ){ in.resetReaderIndex(); throw new CorruptedFrameException("Invalid protocol code: " + protocolCode); } int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } //转换接收的数据为MathMessage MathMessage mes = new MathMessage(); mes.setProtocolCode(protocolCode); mes.setDataLenth(dataLength); mes.setFirstNum(in.readInt()); mes.setSecondNum(in.readInt()); byte[] endMarkBytes = new byte[ProtocolConstants.PROTOCOL_END_LENGTH]; in.readBytes(endMarkBytes); String endMark = ""; try { endMark = new String(endMarkBytes,ProtocolConstants.CHARSET_UTF8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } mes.setEndMark(endMark); out.add(mes); log.info("=======解码计算请求协议成功:"+JsonUtil.toJson(mes)); } }
计算结果协议编码器:
package netty.codec.math; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import netty.constant.math.ProtocolConstants; import netty.message.AckMessage; import util.JsonUtil; import java.io.UnsupportedEncodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 计算结果协议编码器 * @author donald * 2017年6月22日 * 下午10:23:21 */ public class AckMessageEncoder extends MessageToByteEncoder<AckMessage> { private static final Logger log = LoggerFactory.getLogger(AckMessageEncoder.class); @Override protected void encode(ChannelHandlerContext ctx, AckMessage msg, ByteBuf out) { try { out.writeBytes(msg.getProtocolCode(). getBytes(ProtocolConstants.CHARSET_UTF8)); out.writeInt(msg.getDataLenth()); out.writeInt(msg.getResult()); out.writeBytes(msg.getEndMark(). getBytes(ProtocolConstants.CHARSET_UTF8)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } log.info("=======编码计算结果协议成功:"+JsonUtil.toJson(msg)); } }
计算结果协议解码器:
package netty.codec.math; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; import netty.constant.math.ProtocolConstants; import netty.message.AckMessage; import util.JsonUtil; import java.io.UnsupportedEncodingException; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 计算结果协议解码器 * @author donald * 2017年6月22日 * 下午10:47:31 */ public class AckMessageDecoder extends ByteToMessageDecoder { private static final Logger log = LoggerFactory.getLogger(AckMessageDecoder.class); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { //待数据可用时,解码消息 int protocolLenth = ProtocolConstants.PROTOCOL_CODE_LENGTH+ ProtocolConstants.PROTOCOL_DATA_LENGTH; if (in.readableBytes() < protocolLenth) { return; } in.markReaderIndex(); byte[] protocolCodeBytes = new byte[ProtocolConstants.PROTOCOL_CODE_LENGTH]; in.readBytes(protocolCodeBytes); String protocolCode = ""; try { protocolCode = new String(protocolCodeBytes,ProtocolConstants.CHARSET_UTF8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } if (!protocolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200) && !protocolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300) ){ in.resetReaderIndex(); throw new CorruptedFrameException("Invalid protocol code: " + protocolCode); } int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } //转换接收的数据为MathMessage AckMessage mes = new AckMessage(); mes.setProtocolCode(protocolCode); mes.setDataLenth(dataLength); mes.setResult(in.readInt()); byte[] endMarkBytes = new byte[ProtocolConstants.PROTOCOL_END_LENGTH]; in.readBytes(endMarkBytes); String endMark = ""; try { endMark = new String(endMarkBytes,ProtocolConstants.CHARSET_UTF8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } mes.setEndMark(endMark); out.add(mes); log.info("=======解码计算结果协议成功:"+JsonUtil.toJson(mes)); } }
服务端:
package netty.main.math; import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import netty.initializer.math.MathServerInitializer; /** * * @author donald * 2017年6月21日 * 下午12:48:17 */ public class MathServer { private static final Logger log = LoggerFactory.getLogger(MathServer.class); static final boolean SSL = System.getProperty("ssl") != null; private static final String ip = "192.168.31.153"; private static final int port = 10010; public static void main(String[] args) throws Exception { run(); } public static void run() throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } /* * EventLoopGroup(多线程事件loop),处理IO操作,这里我们用了两个事件loop * 第一个boss用于处理器监听连接请求,第二个worker用于数据的传输; * 具体线程是多少依赖于事件loop的具体实现 * */ EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //ServerBootstrap,用于配置服务端,一般为ServerSocket通道 ServerBootstrap serverBoot = new ServerBootstrap(); serverBoot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MathServerInitializer(sslCtx)) .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、 .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port); // 绑定地址,开始监听 ChannelFuture f = serverBoot.bind(inetSocketAddress).sync(); log.info("=========Server is start========="); //等待,直到ServerSocket关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
服务端通道处理器Initializer:
package netty.initializer.math; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.compression.ZlibWrapper; import io.netty.handler.ssl.SslContext; import netty.codec.math.AckMessageEncoder; import netty.codec.math.MathMessageDecoder; import netty.handler.math.MathServerHandler; /** * Creates a newly configured {@link ChannelPipeline} for a server-side channel. */ public class MathServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public MathServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } // Enable stream compression (you can remove these two if unnecessary) pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); // Add the number codec first, pipeline.addLast(new MathMessageDecoder()); pipeline.addLast(new AckMessageEncoder()); // and then business logic. // Please note we create a handler for every new channel // because it has stateful properties. pipeline.addLast(new MathServerHandler()); } }
服务端处理器:
package netty.handler.math; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import netty.constant.math.ProtocolConstants; import netty.message.AckMessage; import netty.message.MathMessage; /** * Handler for a server-side channel. This handler maintains stateful * information which is specific to a certain channel using member variables. * Therefore, an instance of this handler can cover only one channel. You have * to create a new handler instance whenever you create a new channel and insert * this handler to avoid a race condition. * @author donald * 2017年6月23日 * 上午12:21:24 */ public class MathServerHandler extends SimpleChannelInboundHandler<MathMessage> { private static final Logger log = LoggerFactory.getLogger(MathServerHandler.class); private MathMessage mathMes = new MathMessage(); private AckMessage ackMes = new AckMessage(); @Override public void channelRead0(ChannelHandlerContext ctx, MathMessage msg) throws Exception { mathMes = msg; if(!mathMes.getEndMark().equals(ProtocolConstants.PROTOCOL_END)){ ackMes.setProtocolCode(ProtocolConstants.ACK_PROTOCOL_300300); } else{ ackMes.setProtocolCode(ProtocolConstants.ACK_PROTOCOL_300200); } String protocolCode = mathMes.getProtocolCode(); int result = 0; if(protocolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){ result = mathMes.getFirstNum() + mathMes.getSecondNum(); } if(protocolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){ result = mathMes.getFirstNum() * mathMes.getSecondNum(); } ackMes.setResult(result); ackMes.setEndMark(ProtocolConstants.PROTOCOL_END); ackMes.setDataLenth(ProtocolConstants.OPERATE_NUM_LENGTH+ProtocolConstants.PROTOCOL_END_LENGTH); ctx.writeAndFlush(ackMes); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ackMes = null; mathMes = null; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端:
package netty.main.math; import java.net.InetSocketAddress; import javax.net.ssl.SSLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import netty.handler.math.MathClientHandler; import netty.initializer.math.MathClientInitializer; import util.JsonUtil; /** * * @author donald * 2017年6月21日 * 下午12:48:10 */ public final class MathClient { private static final Logger log = LoggerFactory.getLogger(MathClient.class); private static final boolean SSL = System.getProperty("ssl") != null; public static final String ip = System.getProperty("host", "192.168.31.153"); public static final int port = Integer.parseInt(System.getProperty("port", "10010")); public static final int count = Integer.parseInt(System.getProperty("count", "2")); public static void main(String[] args) throws Exception { run(); } private static void run() throws SSLException, InterruptedException{ //配置安全套接字上下文 final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Bootstrap与 ServerBootstrap相似,不同的是Bootstrap用于配置客户端, //一般为Socket通道,或无连接通道 Bootstrap bootstrap = new Bootstrap(); //EventLoopGroup有 boss和worker两组,对于客户端只需要用worker bootstrap.group(workerGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new MathClientInitializer(sslCtx)); InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port); //连接socket地址 ChannelFuture f = bootstrap.connect(inetSocketAddress).sync(); log.info("=========Client is start========="); // Get the handler instance to retrieve the answer. MathClientHandler handler = (MathClientHandler) f.channel().pipeline().last(); // Print out the answer. log.info("=======Calculat result:"+JsonUtil.toJson(handler.getAckMessage())); //等待,直到连接关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
客户端通道处理器Initializer:
package netty.initializer.math; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.compression.ZlibWrapper; import io.netty.handler.ssl.SslContext; import netty.codec.math.AckMessageDecoder; import netty.codec.math.MathMessageEncoder; import netty.handler.math.MathClientHandler; import netty.main.math.MathClient; /** * Creates a newly configured {@link ChannelPipeline} for a client-side channel. */ public class MathClientInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public MathClientInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc(), MathClient.ip, MathClient.port)); } // Enable stream compression (you can remove these two if unnecessary) pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); // Add the number codec first, pipeline.addLast(new AckMessageDecoder()); pipeline.addLast(new MathMessageEncoder()); // and then business logic. pipeline.addLast(new MathClientHandler()); } }
客户端handler:
package netty.handler.math; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import netty.constant.math.ProtocolConstants; import netty.main.math.MathClient; import netty.message.AckMessage; import netty.message.MathMessage; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * Handler for a client-side channel. This handler maintains stateful * information which is specific to a certain channel using member variables. * Therefore, an instance of this handler can cover only one channel. You have * to create a new handler instance whenever you create a new channel and insert * this handler to avoid a race condition. * @author donald * 2017年6月23日 * 上午12:29:00 */ public class MathClientHandler extends SimpleChannelInboundHandler<AckMessage> { private static final Logger log = LoggerFactory.getLogger(MathClientHandler.class); private ChannelHandlerContext ctxLocal; private int receivedMesCount; private int sendedMesCount = 1; final BlockingQueue<AckMessage> ackMessQueue = new LinkedBlockingQueue<AckMessage>(); public AckMessage getAckMessage() { boolean interrupted = false; try { for (;;) { try { return ackMessQueue.take(); } catch (InterruptedException ignore) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } @Override public void channelActive(ChannelHandlerContext ctx) { this.ctxLocal = ctx; sendMathMessages(); } @Override public void channelRead0(ChannelHandlerContext ctx, final AckMessage msg) { receivedMesCount ++; if (receivedMesCount == MathClient.count) { // Offer the answer after closing the connection. ctxLocal.channel().close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { boolean offered = ackMessQueue.offer(msg); assert offered; } }); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } /** * */ private void sendMathMessages() { // Do not send more than 1024 message. ChannelFuture future = null; for (int i = 0; i < 1024 && sendedMesCount <= MathClient.count; i++) { MathMessage mes = new MathMessage(); if(i%2 != 0){ mes.setProtocolCode(ProtocolConstants.SUM_PROTOCOL_300000); } else{ mes.setProtocolCode(ProtocolConstants.MULTI_PROTOCOL_300100); } mes.setFirstNum(17); mes.setSecondNum(8); mes.setEndMark(ProtocolConstants.PROTOCOL_END); mes.setDataLenth(ProtocolConstants.OPERATE_NUM_LENGTH*2+ProtocolConstants.PROTOCOL_END_LENGTH); future = ctxLocal.write(mes); sendedMesCount++; } if (sendedMesCount <= MathClient.count) { assert future != null; future.addListener(mathMesSendListener); } ctxLocal.flush(); } private final ChannelFutureListener mathMesSendListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { sendMathMessages(); } else { future.cause().printStackTrace(); future.channel().close(); } } }; }
启动服务端与客户端,控制台输出:
服务端:
[INFO ] 2017-07-06 09:03:15 netty.main.math.MathServer =========Server is start=========
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageDecoder =======解码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300100","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageEncoder =======编码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":136}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageDecoder =======解码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300000","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageEncoder =======编码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":25}
客户端:
[INFO ] 2017-07-06 09:03:23 netty.main.math.MathClient =========Client is start=========
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageEncoder =======编码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300100","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.MathMessageEncoder =======编码计算请求协议成功:{"dataLenth":10,"endMark":"\r\n","firstNum":17,"protocolCode":"300000","secondNum":8}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageDecoder =======解码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":136}
[INFO ] 2017-07-06 09:03:24 netty.codec.math.AckMessageDecoder =======解码计算结果协议成功:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":25}
[INFO ] 2017-07-06 09:03:24 netty.main.math.MathClient =======Calculat result:{"dataLenth":6,"endMark":"\r\n","protocolCode":"300200","result":25}
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1242netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 1982netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2371netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1268netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1260netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1536netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1770netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1334netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2758netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2103netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 1955netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1019netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1815netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1170netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1151netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 896netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1245netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2129netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 998netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1796netty 管道线定义-ChannelPipeline:htt ...
相关推荐
1、基于Spring Boot 实现的WebSocket实时数据通信Demo 2、结合Netty实现多客户端之间进行网络通信 3、在Web端建立多客户端之间的通信机制
项目中,你可以清晰地看到BIO与NIO模型在Netty中的灵活运用,如何通过Netty的高性能特性来优化网络通信。此外,demo还详细展示了如何通过自定义编解码器处理网络通信中的数据编解码问题,以及如何利用心跳机制确保...
主要给大家介绍了关于如何利用Java搭建个简单的Netty通信,文中通过示例代码介绍的非常详细,对大家学习或者使用Java具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
1. 异步和事件驱动:Netty 使用异步和事件驱动的方式来处理网络通信,这意味着它不会阻塞调用线程,从而提高了应用的响应性和吞吐量。 2. 高性能:Netty 的设计使得它成为了目前所有 NIO 框架中性能最好的框架之一。...
25_gRPC通信示例与JVM回调钩子 26_gRPC服务器流式调用实现 27_gRPC双向流式数据通信详解 28_gRPC与Gradle流畅整合及问题解决的完整过程与思考 29_Gradle插件问题解决方案与Nodejs环境搭建 30_通过gRPC实现Java与...
第25讲:gRPC通信示例与JVM回调钩子 第26讲:gRPC服务器流式调用实现 第27讲:gRPC双向流式数据通信详解 第28讲:gRPC与Gradle流畅整合及问题解决的完整过程与思考 第29讲:Gradle插件问题解决方案与Nodejs环境...
20_通过Apache Thrift实现Java与Python的RPC调用 21_gRPC深入详解 22_gRPC实践 23_Gradle Wrapper在Gradle项目构建中的最佳实践 24_gRPC整合Gradle与代码生成 25_gRPC通信示例与JVM回调钩子 26_gRPC服务器流式调用...
6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...
DotNetty是微软团队参考Java上的Netty开发的网络通信框架,并且保留了Netty原来绝大部分的编程接口。但目前最大的问题是没有官方说明文档,官方示例也仅仅是控制台应用程序,参考价值较低。本项目展示了在WPF中...
基于 Java Web 项目的 SpringBoot 框架初始化模板,该模板整合了常用的框架,保证大家在此...19、基于 Netty 的 WebSocket 全双工通信设计示例 20、对象存储、消息队列、缓存、分布式锁、限流、国际化、网络等工具类
核心模块包含基于 netty 的网络代码,这些代码与 minecraft 没有特别关联。 我的世界 minecraft 模块包含一个特定于 minecraft 的网络设置,并且可以从 bukkit 和 bungee 插件轻松访问它们。 我的世界示例 该模块...
Java局域网通信——飞鸽传书源代码,大家都知道VB版、VC版还有Delphi版的飞鸽传书软件,但是Java版的确实不多,因此这个Java文件传输实例不可错过,Java网络编程技能的提升很有帮助。 Java聊天程序,包括服务端和...
Java局域网通信——飞鸽传书源代码 28个目标文件 内容索引:JAVA源码,媒体网络,飞鸽传书 Java局域网通信——飞鸽传书源代码,大家都知道VB版、VC版还有Delphi版的飞鸽传书软件,但是Java版的确实不多,因此这个Java...
《Linux多线程服务端编程:使用muduo C++网络库》主要讲述采用现代C++在x86-64 Linux上编写多线程TCP网络服务程序的主流常规技术,重点讲解一种适应性较强的多线程服务器的编程模型,即one loop per thread。...
6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...
6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...
6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...
6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...
6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...
6、支持多种通信框架(Mina/Netty/Grizzly),支持多种序列化/反序列化(Java/Hessian/PB); 7、支持自定义通信协议,可完全替换NFS-RPC自带的协议。 淘宝开放平台JAVA版SDK top4java 设计原则 容易维护扩展(不...