`
yanguz123
  • 浏览: 557057 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

\(^_^)/ Java NIO、AIO通信

 
阅读更多

Java NIO 通信

 

 

服务器端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class NIOServer {
	// 用于检测所有Channel状态的Selector
	private Selector selector = null;
	private static final int PORT = 30000;
	// 定义实现编码、解码的字符集对象
	private Charset charset = Charset.forName("UTF-8");

	public void init() throws Exception {
		selector = Selector.open();
		// 通过open方法来打开一个未绑定的ServerSocketChannel实例
		ServerSocketChannel server = ServerSocketChannel.open();
		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
		// 将该ServerSocketChannel绑定到指定IP地址
		server.bind(isa);
		// 设置ServerSocket以非阻塞方式工作
		server.configureBlocking(false);
		// 将server注册到指定Selector对象
		server.register(selector, SelectionKey.OP_ACCEPT);

		while (selector.select() > 0) {
			// 依次处理selector上的每个已选择的SelectionKey
			for (SelectionKey sk : selector.selectedKeys()) {
				// 从selector上的已选择Key集中删除正在处理的SelectionKey
				selector.selectedKeys().remove(sk);
				// 如果sk对应的Channel包含客户端的连接请求
				if (sk.isAcceptable()) {
					// 调用accept方法接受连接,产生服务器端的SocketChannel
					SocketChannel sc = server.accept();
					// 设置采用非阻塞模式
					sc.configureBlocking(false);
					// 将该SocketChannel也注册到selector
					sc.register(selector, SelectionKey.OP_READ);
					// 将sk对应的Channel设置成准备接受其他请求
					sk.interestOps(SelectionKey.OP_ACCEPT);
				}

				// 如果sk对应的Channel有数据需要读取
				if (sk.isReadable()) {
					// 获取该SelectionKey对应的Channel,该Channel中有可读的数据
					SocketChannel sc = (SocketChannel) sk.channel();
					// 定义准备执行读取数据的ByteBuffer
					ByteBuffer buffer = ByteBuffer.allocate(1024);
					StringBuilder content = new StringBuilder();
					// 开始读取数据
					try {
						while (sc.read(buffer) > 0) {
							buffer.flip();
							content.append(charset.decode(buffer));
						}
						// 打印从该sk对应的Channel里读取到的数据
						System.out.println("读取的数据:" + content);
						// 将sk对应的Channel设置成准备下一次读取
						sk.interestOps(SelectionKey.OP_READ);
					} catch (IOException ex) {
						// 如果捕捉到该sk对应的Channel出现了异常,即表明该Channel对应的Client出现了问题,所以从Selector中取消sk的注册
						// 从Selector中删除指定的SelectionKey
						sk.cancel();
						if (sk.channel() != null) {
							sk.channel().close();
						}

					}

					// 如果content的长度大于0,即聊天信息不为空
					if (content.toString().length() > 0) {
						// 遍历该selector里注册的所有SelectionKey
						for (SelectionKey key : selector.keys()) {
							// 获取该key对应的Channel
							Channel targetChannel = key.channel();
							// 如果该channel是SocketChannel对象
							if (targetChannel instanceof SocketChannel) {
								// 将读到的内容写入该Channel中
								SocketChannel dest = (SocketChannel) targetChannel;
								dest.write(charset.encode(content.toString()));
							}
						}
					}
				}
			}
		}

	}

	public static void main(String[] args) throws Exception {
		new NIOServer().init();
	}
}

 

 

客户端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

public class NIOClient {
	// 定义检测SocketChannel的Selector对象
	private Selector selector = null;
	private static final int PORT = 30000;
	// 定义处理编码和解码的字符集
	private Charset charset = Charset.forName("UTF-8");
	// 客户端SocketChannel
	private SocketChannel sc = null;

	public void init() throws IOException {
		selector = Selector.open();
		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
		// 调用open静态方法创建连接到指定主机的SocketChannel
		sc = SocketChannel.open(isa);
		// 设置该sc以非阻塞方式工作
		sc.configureBlocking(false);
		// 将SocketChannel对象注册到指定Selector
		sc.register(selector, SelectionKey.OP_READ);
		// 启动读取服务器端数据的线程
		new ClientThread().start();
		// 创建键盘输入流
		Scanner scan = new Scanner(System.in);
		while (scan.hasNextLine()) {
			// 读取键盘输入
			String line = scan.nextLine();
			// 将键盘输入的内容输出到SocketChannel中
			sc.write(charset.encode(line));
		}

	}

	// 定义读取服务器数据的线程
	private class ClientThread extends Thread {
		@Override
		public void run() {
			try {
				while (selector.select() > 0) {
					// 遍历每个有可用IO操作Channel对应的SelectionKey
					for (SelectionKey sk : selector.keys()) {
						// 删除正在处理的SelectionKey
						selector.selectedKeys().remove(sk);
						// 如果该SelectionKey对应的Channel中有可读的数据
						if (sk.isReadable()) {
							// 使用NIO读取Channel中的数据
							SocketChannel sc = (SocketChannel) sk.channel();
							ByteBuffer buff = ByteBuffer.allocate(1024);
							StringBuilder content = new StringBuilder();
							while (sc.read(buff) > 0) {
								sc.read(buff);
								buff.flip();
								content.append(charset.decode(buff));
							}
							// 打印输出读取的内容
							System.out.println("聊天信息:" + content.toString());
							// 为下一次读取作准备
							sk.interestOps(SelectionKey.OP_READ);
						}
					}
				}
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws IOException {
		new NIOClient().init();
	}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

Java AIO 通信

 

 

服务器端

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AIOServer
{
	static final int PORT = 30000;
	final static String UTF_8 = "utf-8";
	static List<AsynchronousSocketChannel> channelList
		= new ArrayList<>();
	public void startListen() throws InterruptedException,
		Exception 
	{
		// 创建一个线程池
		ExecutorService executor = Executors.newFixedThreadPool(20);
		// 以指定线程池来创建一个AsynchronousChannelGroup
		AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup
			.withThreadPool(executor);
		// 以指定线程池来创建一个AsynchronousServerSocketChannel
		AsynchronousServerSocketChannel serverChannel 
			= AsynchronousServerSocketChannel.open(channelGroup)
			// 指定监听本机的PORT端口
			.bind(new InetSocketAddress(PORT));
		// 使用CompletionHandler接受来自客户端的连接请求
		serverChannel.accept(null, new AcceptHandler(serverChannel));  //①
	}   
	public static void main(String[] args)
		throws Exception
	{
		AIOServer server = new AIOServer();
		server.startListen();
	}
}
// 实现自己的CompletionHandler类
class AcceptHandler implements
	CompletionHandler<AsynchronousSocketChannel, Object>
{
	private AsynchronousServerSocketChannel serverChannel; 
	public AcceptHandler(AsynchronousServerSocketChannel sc)
	{
		this.serverChannel = sc;
	}
	// 定义一个ByteBuffer准备读取数据
	ByteBuffer buff = ByteBuffer.allocate(1024); 
	// 当实际IO操作完成时候触发该方法
	@Override
	public void completed(final AsynchronousSocketChannel sc
		, Object attachment)
	{
		// 记录新连接的进来的Channel
		AIOServer.channelList.add(sc);
		// 准备接受客户端的下一次连接
		serverChannel.accept(null , this);
		sc.read(buff , null 
			, new CompletionHandler<Integer,Object>()  //②
		{
			@Override
			public void completed(Integer result
				, Object attachment)
			{
				buff.flip();
				// 将buff中内容转换为字符串
				String content = StandardCharsets.UTF_8
					.decode(buff).toString();
				// 遍历每个Channel,将收到的信息写入各Channel中
				for(AsynchronousSocketChannel c : AIOServer.channelList)
				{
					try
					{
						c.write(ByteBuffer.wrap(content.getBytes(
							AIOServer.UTF_8))).get();
					}
					catch (Exception ex)
					{
						ex.printStackTrace();
					}
				}
				buff.clear();
				// 读取下一次数据
				sc.read(buff , null , this);
			}
			@Override
			public void failed(Throwable ex, Object attachment)
			{
				System.out.println("读取数据失败: " + ex);
				// 从该Channel读取数据失败,就将该Channel删除
				AIOServer.channelList.remove(sc);
			}
		});
	}
	@Override
	public void failed(Throwable ex, Object attachment)
	{
		System.out.println("连接失败: " + ex);
	}
}

 

 

 

客户端:

import java.awt.BorderLayout;
import java.awt.event.ActionEvent;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import javax.swing.KeyStroke;

public class AIOClient {
	final static String UTF_8 = "utf-8";
	final static int PORT = 30000;
	// 与服务器端通信的异步Channel
	AsynchronousSocketChannel clientChannel;
	JFrame mainWin = new JFrame("多人聊天");
	JTextArea jta = new JTextArea(16, 48);
	JTextField jtf = new JTextField(40);
	JButton sendBn = new JButton("发送");

	public void init() {
		mainWin.setLayout(new BorderLayout());
		jta.setEditable(false);
		mainWin.add(new JScrollPane(jta), BorderLayout.CENTER);
		JPanel jp = new JPanel();
		jp.add(jtf);
		jp.add(sendBn);
		// 发送消息的Action,Action是ActionListener的子接口
		Action sendAction = new AbstractAction() {
			public void actionPerformed(ActionEvent e) {
				String content = jtf.getText();
				if (content.trim().length() > 0) {
					try {
						// 将content内容写入Channel中
						clientChannel.write(ByteBuffer.wrap(content.trim().getBytes(UTF_8))).get(); // ①
					} catch (Exception ex) {
						ex.printStackTrace();
					}
				}
				// 清空输入框
				jtf.setText("");
			}
		};
		sendBn.addActionListener(sendAction);
		// 将Ctrl+Enter键和"send"关联
		jtf.getInputMap().put(KeyStroke.getKeyStroke('\n', java.awt.event.InputEvent.CTRL_MASK), "send");
		// 将"send"和sendAction关联
		jtf.getActionMap().put("send", sendAction);
		mainWin.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
		mainWin.add(jp, BorderLayout.SOUTH);
		mainWin.pack();
		mainWin.setVisible(true);
	}

	public void connect() throws Exception {
		// 定义一个ByteBuffer准备读取数据
		final ByteBuffer buff = ByteBuffer.allocate(1024);
		// 创建一个线程池
		ExecutorService executor = Executors.newFixedThreadPool(80);
		// 以指定线程池来创建一个AsynchronousChannelGroup
		AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
		// 以channelGroup作为组管理器来创建AsynchronousSocketChannel
		clientChannel = AsynchronousSocketChannel.open(channelGroup);
		// 让AsynchronousSocketChannel连接到指定IP、指定端口
		clientChannel.connect(new InetSocketAddress("127.0.0.1", PORT)).get();
		jta.append("---与服务器连接成功---\n");
		buff.clear();
		clientChannel.read(buff, null, new CompletionHandler<Integer, Object>() // ②
				{
					@Override
					public void completed(Integer result, Object attachment) {
						buff.flip();
						// 将buff中内容转换为字符串
						String content = StandardCharsets.UTF_8.decode(buff).toString();
						// 显示从服务器端读取的数据
						jta.append("某人说:" + content + "\n");
						buff.clear();
						clientChannel.read(buff, null, this);
					}

					@Override
					public void failed(Throwable ex, Object attachment) {
						System.out.println("读取数据失败: " + ex);
					}
				});
	}

	public static void main(String[] args) throws Exception {
		AIOClient client = new AIOClient();
		client.init();
		client.connect();
	}
}

 

 

分享到:
评论

相关推荐

    aio_bio_nio.rar

    java 网络通信 aio bio nio 例子 参考以下网址 https://blog.csdn.net/anxpp/article/details/51512200

    2024年Java常见的-BIO,NIO,AIO,Netty面试题

    主要是介绍java方面网络方面 网络通信方面的面试题,核心内容主要是关于BIO,NIO,AIO,Netty的面试,算是一套八股文吧,还是老话,该背的还是要背一背!!!

    Java编程中的IO模型详解:BIO,NIO,AIO的区别与实际应用场景分析

    IO模型决定了数据的传输方式,Java支持BIO,NIO,AIO三种IO模型。BIO是同步阻塞模型,特点是一对一的客户端与处理线程关系,适用场景是连接数量较小并且固定的,优点是编程简单,但对服务器资源要求高。NIO是同步非...

    Java讲义(第2版)

    深圳电信培训中心徐海蛟博士 Java/JavaEE 教学用的参考资料。...本书全面介绍了Java 7的二进制整数、菱形语法、增强switch语句、多异常捕获、自动关闭资源的try语句、JDBC4.1新特性、NIO.2、AIO等新特性。

    Java IO 体系.md

    - BIO NIO 和 AIO 的区别 - 什么是流 - 流的分类 - 节点流和处理流 - Java IO 的核心类 File - Java IO 流对象 - 字节流对象 - InputStream - OutputStream - 字符流对象 - Reader - Writer - 字节流与...

    疯狂java讲义

    《疯狂Java讲义(附光盘第2版)》全面介绍了Java 7的二进制整数、菱形语法、增强switch语句、多异常捕获、自动关闭资源的try语句、JDBC 4.1新特性、NIO.2、AIO等新特性。 与第1版类似,《疯狂Java讲义(附光盘第2版)》...

    基于javatcpsocket通信的拆包和装包源码-Netty-practice:Netty学习实践

    BIO/NIO/AIO基础 阻塞I/O 非阻塞I/O I/O复用 信号驱动的I/O 异步I/O Java I/O模型 同步阻塞IO 1:1同步阻塞IO通信模型 M:N形式的同步阻塞IO通信模型 非阻塞式IO模型(NIO) NIO+单线程Reactor模型 NIO+多线程Reactor...

    Java思维导图xmind文件+导出图片

    IO 的基本概念、NIO、AIO、BIO深入分析 NIO的核心设计思想 Netty产生的背景及应用场景分析 基于Netty实现的高性能IM聊天 基于Netty实现Dubbo多协议通信支持 Netty无锁化串行设计及高并发处理机制 手写实现多...

    Java 基础核心总结 +经典算法大全.rar

    BIO NIO 和 AIO 的区别什么是流 流的分类 节点流和处理流 Java IO 的核心类 File Java IO 流对象 字节流对象InputStream OutputStream 字符流对象Reader Writer 字节流与字符流的转换新潮的 NIO 缓冲区(Buffer)通道...

    Java源码,可运行的实战SpringBoot服务源码(实战中总结测试)

    「喜欢的自提」 可以直接运行的代码,里面是我在工作中用来学和和测试的代码,一般是得到想要的测试结果,就直接用于实战项目中...17、BIO、NIO、AIO、Reactor、netty相关学习总结和测试(客户端/服务端通信测试demo)

    Java高性能通信统一框架的设计

    Java 高性能通信统一框架的主要作用是封装底层I/O(input output),提供高级的 API(app1ication programming interface)操作,满足应用服务器迅速支持多种协议的需求。本文首先介绍了NIO(non-b1ocking IO)与BIO...

    了解java远程通讯技术的最好的文章

    了解java远程通讯技术的最好的文章, ...传输协议比较出名的有http、tcp、udp等等,http、tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输协议,网络IO,主要有bio、nio、aio三种方式

    gRPC线程模型分析

    在JDK1.4推出JavaNIO之前,基于Java的所有Socket通信都采用了同步阻塞模式(BIO),这种一请求一应答的通信模型简化了上层的应用开发,但是在性能和可靠性方面却存在着巨大的瓶颈。因此,在很长一段时间里,大型的...

    leetcode题库-java-interview:Java研发基础相关

    Java-Interview 四大基本特性 重载与重写的区别 访问控制符 Object类方法 抽象类与接口 类初始化顺序 hashCode & equals == & equals this static 基本类型 & 包装类 String 泛型 内部类 集合类 ArrayList & ...

    daydayup:每天都在进步,每周都在总结,Java架构师成长之路。目前已经完成:MongoDB,Netty,Nginx,MySQL,Java,Redis,Shiro,Solr,SpringBoot,SpringData,SSO,Mybatis,Kotlin,还在持续更新中

    系列博客:ShiroShiro 是Java的安全框架,使用简单,功能强大 系列博客:NettyNetty 服务启动流程分析,拆包粘包,编解码技术,数据通信,心跳监测,BIO,NIO,AIO区别,常见面试题。 系列博客:MySQLMySQL 索引优化...

    socket.zip

    本人亲自编写测试通过的javaIO流、pipe通道、NIO、AIO等各种socket编程代码,可以让你更加深入的体会到线程间的通信,以及同步、异步、阻塞、非阻塞等各种实现

    copyFromITDragonBlog

    Netty 服务启动流程分析,拆包粘包,编解码技术,数据通信,心跳监测,BIO,NIO,AIO区别,常见面试题。 系列博客: MySQL MySQL 索引优化分析,行锁和表锁机制。 系列博客: ITDragon 有零碎知识: , 。点击进入.....

    libevent在MFC上的实现(工具:LibeventServer.exe)

    花了1周的时间研究各种TCP模型, 包括LINUX和WINDOWS平台等, 包括C&C++, C#, JAVA, GOLANG, ERLANG等居于IOCP, SELECT, POLL, EPOLL, KQUEUE, BIO, NIO, AIO, 并发语言等实现的同步或异步, 阻塞或非阻塞通信模型...

Global site tag (gtag.js) - Google Analytics