经常要用到网络编程。目前常用的服务器端有的Socket编程,还有NIO编程。
针对NIO编程,这里选择了一个成熟的框架Netty来编写服务器端。
而Socket的Server端,采用线程池来构造,能满足日常的一般需求。
========== netty
EchoServer.java
import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.apache.log4j.Logger; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; public class EchoServer { private static final Logger LOGGER = Logger.getLogger(EchoServer.class); private static final int PORT = 10007; private static final String FRAMER = "framer"; private static final String DECODER = "decoder"; private static final String ENCODER = "encoder"; private static final String HANDLER = "handler"; /** * @param args */ public static void main(String[] args) { LOGGER.info("start netty echo server."); ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast(FRAMER, new DelimiterBasedFrameDecoder( Integer.MAX_VALUE, Delimiters.lineDelimiter())); pipeline.addLast(DECODER, new StringDecoder()); pipeline.addLast(ENCODER, new StringEncoder()); pipeline.addLast(HANDLER, new EchoServerHandler()); return pipeline; } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(PORT)); } }
EchoServerHandler.java
import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import com.alibaba.fastjson.JSON; public class EchoServerHandler extends SimpleChannelHandler { private static final Logger LOGGER = Logger.getLogger(EchoServer.class); private static final AtomicInteger COUNT = new AtomicInteger(); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { String inputString = (String) e.getMessage(); try { TransportMessage request = JSON.parseObject(inputString, TransportMessage.class); String response = null; if (request.getCode() % 2 == 0) { response = JSON.toJSONString(new TransportMessage(7100, "request.getState()%2 == 0" + request.getContent())); } else { response = JSON.toJSONString(new TransportMessage(8100, request .getContent())); } Channel channel = e.getChannel(); byte[] arr = new StringBuilder(response).append("\tcount:") .append(COUNT.getAndIncrement()).append("\n").toString() .getBytes(); ChannelBuffer word = ChannelBuffers.buffer(arr.length); word.writeBytes(arr); channel.write(word); if (LOGGER.isDebugEnabled()) { LOGGER.debug(Thread.currentThread().getName() + ":" + e.getRemoteAddress() + " receives message : " + inputString + " -- send message : " + response); } System.out.println(Thread.currentThread().getName() + ":" + e.getRemoteAddress() + " receives message : " + inputString + " -- send message : " + response); } catch (Exception e1) { LOGGER.error("Error:" + e.getMessage(), e1); } } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ctx.sendUpstream(e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { LOGGER.error("Error:" + e.getCause().getMessage(), e.getCause()); Channel ch = e.getChannel(); ch.close(); } }
TransportMessage.java
public class TransportMessage { private int code; private String content; public TransportMessage(int code, String content) { super(); this.code = code; this.content = content; } public TransportMessage() { super(); } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public int hashCode() { final int prime = 37; int result = 17; result = prime * result + code; result = prime * result + ((content == null) ? 0 : content.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TransportMessage other = (TransportMessage) obj; if (code != other.code) return false; if (content == null) { if (other.content != null) return false; } else if (!content.equals(other.content)) return false; return true; } @Override public String toString() { return "TransportMessage [code=" + code + ", content=" + content + "]"; } }
======== socket server (with thread pool)
SocketServer.java
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class SocketServer implements Runnable { private final String name; private final ServerSocket server; private final BlockingQueue<Socket> queue; public SocketServer(String name, BlockingQueue<Socket> queue, int port) throws IOException { super(); this.name = name; this.server = new ServerSocket(port); this.server.setReuseAddress(true); this.queue = queue; System.out.println(server + " start to run."); } @Override public void run() { Thread.currentThread().setName(name); while (true) { try { queue.put(server.accept()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 7777; int threadPoolSize = Runtime.getRuntime().availableProcessors() * 6; BlockingQueue<Socket> queue = new LinkedBlockingQueue<Socket>(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new SocketServer("producer", queue, port)); for (int i = 0; i < threadPoolSize; i++) { exec.execute(new Consumer(queue, "consumer-" + i)); } exec.shutdown(); } }
Consumer.java
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.fastjson.JSON; public class Consumer implements Runnable { private static final AtomicInteger count = new AtomicInteger(); private final BlockingQueue<Socket> queue; private final String name; public Consumer(BlockingQueue<Socket> queue, String name) { super(); this.queue = queue; this.name = name; } public void run() { Thread.currentThread().setName(name); System.out.println(name + " start to run."); while (true) { try { consume(queue.take()); } catch (Exception e) { e.printStackTrace(); } } } private void consume(Socket socket) throws IOException { try { socket.setTcpNoDelay(true); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); for (String msg = br.readLine(); msg != null; msg = br.readLine()) { System.out.println(Thread.currentThread().getName() + " receive msg : " + msg + " from " + socket.getRemoteSocketAddress() + ":" + socket.getPort()); String response = echoMsg(msg); pw.println(response); pw.flush(); System.out.println(Thread.currentThread().getName() + " send msg: " + response); if (msg.equalsIgnoreCase("bye")) { break; } } } finally { if (null != socket) { try { socket.close(); System.out.println(Thread.currentThread().getName() + " closes " + socket); } catch (IOException e) { e.printStackTrace(); } } } } private PrintWriter getWriter(Socket socket) throws IOException { return new PrintWriter(socket.getOutputStream()); } private BufferedReader getReader(Socket socket) throws IOException { return new BufferedReader( new InputStreamReader(socket.getInputStream())); } private String echoMsg(String request) { TransportMessage msg = JSON .parseObject(request, TransportMessage.class); return msg.getCode() + " --> world_" + count.getAndIncrement(); } }
=== socket client
SocketClient.java
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; import com.alibaba.fastjson.JSON; public class SocketClient { private final Socket socket; public SocketClient(String host, int port) throws UnknownHostException, IOException { this.socket = new Socket(host, port); this.socket.setTcpNoDelay(true); this.socket.setTrafficClass(0x08 | 0x10); this.socket.setReuseAddress(true); } private PrintWriter getWriter(Socket socket) throws IOException { return new PrintWriter(socket.getOutputStream()); } private BufferedReader getReader(Socket socket) throws IOException { return new BufferedReader( new InputStreamReader(socket.getInputStream())); } private void talk() { try { BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); for (int i = 0; i < 100; i++) { TransportMessage tm = new TransportMessage(i, "Hello_" + i); String msg = JSON.toJSONString(tm); pw.println(msg); pw.flush(); System.out.println("send msg : " + msg); System.out.println("receive msg : " + br.readLine()); } } catch (Exception e) { e.printStackTrace(); } finally { if (null != socket) { try { socket.close(); } catch (IOException ex) { ex.printStackTrace(); } } } } public static void main(String[] args) throws UnknownHostException, IOException { String host = "192.168.123.53"; int port = 7777; // int port = 10007; for (int i = 0; i < 100000; i++) { new SocketClient(host, port).talk(); } } }
相关推荐
在线教学质量评价系统可以方便和全面地收集教师教学工作的数据,提供师生网上评教的评分结果,快速集中收集各方面的评教信息,使教务管理部门能够及时了解教学动态和师资情况,为教务老师提供相关决策支持,为职称评聘提供教学工作质量的科学依据,同时减轻了教务老师的工作量。
python-3.10.7-amd64.zip
自研扩散模型高光谱修复网络 基于MST_Plus_Plus 网络改造。 试验数据 扩散模型loss初步测试降到了0.005,比不加扩散loss小了20倍, 训练入口 train_cos_img.py
企业数据治理之数据安全治理方案
这是历年的毕业设计的项目,基于Android的一个红外防盗报警。需要自己添加蜂鸣器和热释电的硬件访问服务。
短视频用户价值研究报告2022
基于springboot的java毕业&课程设计
可运行源码(含数据库脚本)+开发文档+lw(高分毕设项目) java期末大作业毕业设计项目管理系统计算机软件工程大数据专业 内容概要:首先在日常的出行中,老旧城区道路狭窄,容易造成车辆的堵塞,每天早晚,接送孩子的车辆数密集,会造成相应的交通堵塞情况。而同样的,在停车的管理上,一方面我国的停车场面积较少,停车位一位难求,特别是在现在的一些小区里,为了抢停车位而产生的矛盾也日益突出。另一方面在停车场的管理上也存在着较大的管理问题,进车容易出车难是当下的停车场所出现的主要问题。而现在的停车场管理系统眼花缭乱,效果水平也良莠不齐,停车场的管理是当下各大城市的公共设施发展的一大难题,而国家、各大省市也都开 全套项目源码+详尽文档,一站式解决您的学习与项目需求。 适用人群: 计算机、通信、人工智能、自动化等专业的学生、老师及从业者。 使用场景及目标: 无论是毕设、期末大作业还是课程设计,一键下载,轻松部署,助您轻松完成项目。 项目代码经过调试测试,确保直接运行,节省您的时间和精力。 其他说明: 项目整体具有较高的学习借鉴价值,基础能力强的可以在此基础上修改调整,以实现不同的功能。
基于springboot的java毕业&课程设计
微信小程序设计之相关行业源码及图文导入教程
网络游戏开发是一项很大的工程,需要很多综合性的知识。这对于刚刚入门的开发者来说很难理解。本论文从研究开发一个模仿泡泡堂网络游戏的例子出发,讲述网络游戏开发中用到的一些最基本的知识和设计思想,使大家清晰的理解游戏开发的过程。 整个设计中利用java中的swing编程,结合游戏的操作流程,对整个游戏进行精心的设计和大量的测试,实现游戏软件服务器端和客户端的开发,为玩家提供一个友好美观的操作界面,并添加聊天等功能以增加玩家之间的互动性,此外实现了可编辑场景地图的功能,使得游戏内容的更加丰富,玩家交互性更好,确保了游戏更具有趣味性、灵活性,以满足玩家对这款网络游戏的要求。
外东洪路中段.m4a
软考3333333333
附件是Elasticsearch 的全文搜索功能使用方法,文件绿色安全,请大家放心下载,仅供交流学习使用,无任何商业目的!
1.版本:matlab2014/2019a/2021a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
基于springboot的java毕业&课程设计
CCNP TSHOOT 642-832 Official Certification Guide
MySQL8.4.0 LTS(mysql-server_8.4.0-1ubuntu22.04_amd64.deb-bundle.tar)适用于Ubuntu 22.04 Linux (x86, 64-bit)
课设毕设基于SpringBoot+Vue的某银行OA系统 LW+PPT+源码可运行.zip