`
dreamoftch
  • 浏览: 485166 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Java io nio netty三种方式实现简单聊天功能

阅读更多

 

netty和nio的比较:

http://news.cnblogs.com/n/205413/ 

 

一:首先是Java IO:

 

Server: 

 

package com.tch.test.chat.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Server {

	private static AtomicInteger counter = new AtomicInteger(0);
	private static Map<Integer, PrintWriter> clients = new ConcurrentHashMap<>();
	private static ExecutorService pool = Executors.newCachedThreadPool();
	
	public static void main(String[] args) throws Throwable {
		ServerSocket serverSocket = new ServerSocket(8888);
		try {
			while(true){
				Socket socket = serverSocket.accept();
				System.out.println("a client connected ...");
				pool.execute(new ClientSocketHandler(socket));
			}
		} finally{
			serverSocket.close();
		}
	}
	
	private static class ClientSocketHandler implements Runnable{
		private Socket socket;
		private Integer id;
		public ClientSocketHandler(Socket socket){
			this.socket = socket;
			id = counter.incrementAndGet();
		}
		@Override
		public void run() {
			try {
				BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
				PrintWriter out = new PrintWriter(socket.getOutputStream());
				clients.put(id, out);
				System.out.println("client num:" + clients.size());
				String msg = null;
				while((msg = in.readLine()) != null){
					System.out.println("receive msg '" + msg + "' from client-" + id);
					sendBack2Client("cllient-" + id + " said : " + msg);
					out.flush();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	private static void sendBack2Client(String msg){
		for(PrintWriter client : clients.values()){
			client.println(msg);
			client.flush();
		}
	}
	
}

 

Client:

 

package com.tch.test.chat.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {

	public static void main(String[] args) throws Throwable {
		Socket socket = new Socket("localhost", 8888);
		try {
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			PrintWriter out = new PrintWriter(socket.getOutputStream());
			new Thread(new ServerResponseHandler(in)).start();
			BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in));
			String msg = null;
			while((msg = userInput.readLine()) != null){
				out.println(msg);
				out.flush();
			}
		} finally{
			socket.close();
		}
	}
	
	private static class ServerResponseHandler implements Runnable{
		BufferedReader in;
		public ServerResponseHandler(BufferedReader in){
			this.in = in;
		}
		@Override
		public void run() {
			try {
				String msg = null;
				while((msg = in.readLine()) != null){
					System.out.println(msg);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
}

 

 

二:使用Java NIO:

 

Server:

 

package com.tch.test.chat.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class NioServer {

	private Set<SelectionKey> selectionKeys = null;
	private Iterator<SelectionKey> iterator = null;
	private Iterator<SocketChannel> iterator2 = null;
	private List<SocketChannel> clients = new ArrayList<SocketChannel>();
	private static Selector selector;
	
	static{
		 try {
			selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws Exception {
		new NioServer().start();
	}
	
	private void start() throws Exception{
		initSeverSocketChannel();
		while(true){
			int ready = selector.select();
			if(ready > 0){
				selectionKeys = selector.selectedKeys();
				iterator = selectionKeys.iterator();
				while(iterator.hasNext()){
					SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isAcceptable()){
                    	acceptClient(selectionKey);
                    }else if(selectionKey.isReadable()){
                    	readMsg(selectionKey);
                    }
					iterator.remove();
				}
			}
		}
	}
	
	private void initSeverSocketChannel() throws Exception{
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		serverSocketChannel.bind(new InetSocketAddress(7878));
		serverSocketChannel.configureBlocking(false);
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
	}
	
	private void acceptClient(SelectionKey selectionKey) throws Exception{
		ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
		SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        clients.add(socketChannel);
        System.out.println("a client connected ...");
	}
	
	private void readMsg(SelectionKey selectionKey) throws Exception{
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
        buffer.clear();
        socketChannel.read(buffer);
        buffer.flip();
        iterator2 = clients.iterator();
        SocketChannel socketChannel2 = null;
        while(iterator2.hasNext()){
            socketChannel2 = iterator2.next();
            while(buffer.hasRemaining()){
                socketChannel2.write(buffer);
            }
            buffer.rewind();
        }
	}
	
}

 

Client:

 

package com.tch.test.chat.nio;

import java.awt.GridLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JTextArea;
import javax.swing.JTextField;

public class NioClient extends JFrame{

	private static final long serialVersionUID = 1L;
	private JTextArea area = new JTextArea();
	private JTextField textField = new JTextField();
	private JButton button = new JButton("Send Message");
	private static Selector selector;
	
	static{
		 try {
			selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws Exception {
		NioClient client = new NioClient();
		client.start();
	}
	
	private void start() throws Exception{
		
		initFrame();
		
		initSocketChannel();
		while(true){
			int ready = selector.select();
			if(ready > 0){
				Set<SelectionKey> selectionKeys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = selectionKeys.iterator();
				while(iterator.hasNext()){
					SelectionKey selectionKey = iterator.next();
					if(selectionKey.isReadable()){
						
						readMsg(selectionKey);
					}
					iterator.remove();
				}
			}
		}
	}
	
	private void readMsg(SelectionKey selectionKey) throws Exception{
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
		socketChannel.read(buffer);
		buffer.flip();
		area.setText(area.getText().trim()+"\n"+new String(buffer.array(),0,buffer.limit(),"utf-8"));
		buffer.clear();
	}
	
	private void initFrame(){
		setBounds(200, 200, 300, 400);
		setLayout(new GridLayout(3, 1));
		add(area);
		add(textField);
		add(button);
		setDefaultCloseOperation(EXIT_ON_CLOSE);
		setVisible(true);
	}
	
	private void initSocketChannel() throws Exception{
		SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 7878));
		socketChannel.configureBlocking(false);
		socketChannel.register(selector, SelectionKey.OP_READ);
		button.addActionListener(new MyActionListener(socketChannel));
	}
	
	private class MyActionListener implements ActionListener{
		
		private SocketChannel socketChannel;
		
		public MyActionListener(SocketChannel socketChannel){
			this.socketChannel = socketChannel;
		}
		@Override
		public void actionPerformed(ActionEvent event) {
			try {
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				String message = textField.getText();
				if(message == null || message.trim().isEmpty()){
					System.out.println("empty message");
					return;
				}
				textField.setText("");
				buffer.put(message.getBytes("utf-8"));
				buffer.flip();
				while(buffer.hasRemaining()){
					socketChannel.write(buffer);
				}
				buffer.clear();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
}

 

 

三:最后是使用netty(参考jar包里面example里面的例子):

 

pom.xml添加依赖:

 

		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.0.36.Final</version>
		</dependency>

 

Server:

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.telnet.TelnetServer;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Simple SSL chat server modified from {@link TelnetServer}.
 */
public final class NettyServer {

    private static final int PORT = 8992;

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new MyServerChannelInitializer());

            b.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Creates a newly configured {@link ChannelPipeline} for a new channel.
 */
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // On top of the SSL handler, add the text line codec.
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());

        // and then business logic.
        pipeline.addLast(new MyServerHandler());
    }
}

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Handles a server-side channel.
 */
public class MyServerHandler extends SimpleChannelInboundHandler<String> {

	private static AtomicInteger counter = new AtomicInteger(0);
	private static Map<Channel, Integer> channels = new ConcurrentHashMap<Channel, Integer>();

    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws UnknownHostException {
    	Integer channelNum = counter.incrementAndGet();
    	channels.put(ctx.channel(), channelNum);
    	ctx.writeAndFlush("Hello user-" + channelNum + "\r\n");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // Send the received message to all channels but the current one.
        for (Map.Entry<Channel, Integer> entry : channels.entrySet()) {
        	Channel c = entry.getKey();
            if (c != ctx.channel()) {
                c.writeAndFlush("user-" + channels.get(ctx.channel()) + " said: " + msg + '\n');
            } else {
                c.writeAndFlush("[you] said: " + msg + '\n');
            }
        }

        // Close the connection if the client has sent 'bye'.
        if ("bye".equals(msg.toLowerCase())) {
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

 

 

Client:

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.telnet.TelnetClient;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * Simple SSL chat client modified from {@link TelnetClient}.
 */
public final class NettyClient {

	private static final String HOST = "127.0.0.1";
	private static final int PORT = 8992;

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new MyClientChannelInitializer());

            // Start the connection attempt.
            Channel ch = b.connect(HOST, PORT).sync().channel();

            // Read commands from the stdin.
            ChannelFuture lastWriteFuture = null;
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                String line = in.readLine();
                if (line == null) {
                    break;
                }

                // Sends the received line to the server.
                lastWriteFuture = ch.writeAndFlush(line + "\r\n");

                // If user typed the 'bye' command, wait until the server closes
                // the connection.
                if ("bye".equals(line.toLowerCase())) {
                    ch.closeFuture().sync();
                    break;
                }
            }

            // Wait until all messages are flushed before closing the channel.
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
        } finally {
            // The connection is closed automatically on shutdown.
            group.shutdownGracefully();
        }
    }
}

 

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Creates a newly configured {@link ChannelPipeline} for a new channel.
 */
public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // On top of the SSL handler, add the text line codec.
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());

        // and then business logic.
        pipeline.addLast(new MyClientHandler());
    }
}

 

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Handles a client-side channel.
 */
public class MyClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

 

 

OK,以上就是不同的方式实现聊天。

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics