`
aids198311
  • 浏览: 58850 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

仿照jetty的nio原理写了个例子

    博客分类:
  • nio
nio 
阅读更多
看了好些天的nio和jetty源码,写了个例子。
太晚了,先直接贴代码了,注释不是很全。
更新了代码的解释和2个疑问在最下面
package com.daizuan.jetty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 
 * 
 * @author daizuan
 */
public class SimpleJettyServer {

    private final ConcurrentLinkedQueue<Object> _changes_con       = new ConcurrentLinkedQueue<Object>();
    private final ConcurrentLinkedQueue<Object> _changes_req       = new ConcurrentLinkedQueue<Object>();
    private static int                          DEFAULT_BUFFERSIZE = 16;
    private static String                       DEFAULT_CHARSET    = "GBK";
    private ServerSocketChannel                 channel;
    private Selector                            selector;
    private int                                 port;
    private static final String                 EXIT               = "exit";
    private static final String                 FORMAT             = "yyyy-MM-dd HH:mm:ss";

    public SimpleJettyServer(int port) throws IOException{
        this.port = port;
        this.channel = ServerSocketChannel.open();
        this.selector = Selector.open();

    }

    private String getTime() {
        DateFormat df = new SimpleDateFormat(FORMAT);
        return df.format(new Date());

    }

    public void listen() throws IOException { // 服务器开始监听端口,提供服务
        channel.socket().bind(new InetSocketAddress(port)); // 将scoket榜定在制定的端口上
        channel.configureBlocking(true);
        new Thread(new ConnectionHander()).start();
        new Thread(new RequestExecutor()).start();
        new Thread(new RequestHander()).start();

    }

    class ConnectionHander implements Runnable {

        @Override
        public void run() {
            System.out.println("ConnectionHander:connection Hander start......");
            while (true) {
                // 分发连接事件
                SocketChannel sc = null;
                try {
                    // 这里阻塞监听连接事件
                    sc = channel.accept();
                    sc.configureBlocking(false);
                    _changes_con.add(sc);
                    // 释放selector的锁,以便接收注册信息
                    selector.wakeup();
                    System.out.println("listener:a client in![" + sc.socket().getRemoteSocketAddress() + "]");
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }

        }

    }

    /**
     * @author daizuan 分发请求
     */
    class RequestHander implements Runnable {

        @Override
        public void run() {
            System.out.println("RequestHander:Request Hander start......");
            while (true) {
                try {
                    // 阻塞,直到有请求进来
                    selector.select();
                    // 因为注册信息需要获取selector的锁,所以需要放在这里处理注册信息
                    int changes = _changes_con.size();
                    Object change = null;
                    while (changes-- > 0 && (change = _changes_con.poll()) != null) {
                        try {
                            if (change instanceof SocketChannel) {
                                SocketChannel sc = (SocketChannel) change;
                                String id = "[" + sc.socket().getRemoteSocketAddress() + "] ";
                                sc.register(selector, SelectionKey.OP_READ, id);
                                write(sc, "hello:" + id + ",please input something!\n");
                                System.out.println("a client connected!" + id);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    Set<SelectionKey> keys = selector.selectedKeys();
                    // 处理请求信息,扔进请求队列
                    for (SelectionKey key : keys) {
                        if (key.isReadable()) {
                            // 取消注册事件
                            key.interestOps(0);
                            _changes_req.add(key);
                        }
                    }
                    selector.selectedKeys().clear();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }

        }

    }

    private void close(SocketChannel sc) {
        if (sc != null && sc.socket() != null) {
            try {
                sc.socket().close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        if (sc != null) {
            try {
                sc.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    private void write(SocketChannel sc, String str) {
        try {
            sc.write(ByteBuffer.wrap(str.getBytes(DEFAULT_CHARSET)));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 数组扩容
     * 
     * @param src byte[] 源数组数据
     * @param size int 扩容的增加量
     * @return byte[] 扩容后的数组
     */
    private byte[] grow(byte[] src, int size) {
        byte[] tmp = new byte[src.length + size];
        System.arraycopy(src, 0, tmp, 0, src.length);
        return tmp;
    }

    private String parseRequest(SocketChannel sc) throws IOException {
        ByteBuffer bbuffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE);
        int count = 0;
        int off = 0;
        byte[] data = new byte[DEFAULT_BUFFERSIZE * 10];
        bbuffer.clear();
        // 循环一次性吧所有数据读完,否则可能buffer满了,数据未读完
        while ((count = sc.read(bbuffer)) > 0) {
            bbuffer.flip();
            if ((off + count) > data.length) {
                data = grow(data, DEFAULT_BUFFERSIZE * 10);
            }
            byte[] buf = bbuffer.array();
            System.arraycopy(buf, 0, data, off, count);
            off += count;
        }

        if (count < 0) {
            return null;
        }

        byte[] req = new byte[off];
        System.arraycopy(data, 0, req, 0, off);
        return new String(req, DEFAULT_CHARSET).trim();

    }

    private void interestKey(SelectionKey key, int OP) {
        key.interestOps(OP);
        selector.wakeup();
    }

    private boolean needToCanncel(String request) {
        return EXIT.equals(request);
    }

    /**
     * @author daizuan 处理请求
     */
    class RequestExecutor implements Runnable {

        @Override
        public void run() {
            System.out.println("RequestExecutor:Request Executor start......");
            while (true) {
                int changes = _changes_req.size();
                Object change = null;
                while (changes-- > 0 && (change = _changes_req.poll()) != null) {
                    try {
                        if (change instanceof SelectionKey) {
                            SelectionKey key = (SelectionKey) change;
                            SocketChannel sc = (SocketChannel) key.channel();
                            // 解析出请求
                            String request = parseRequest(sc);
                            // 客户端退出
                            if (request == null) {
                                close(sc);
                                continue;
                            }
                            String id = (String) key.attachment();
                            System.out.println("read [" + request + "] from " + id);
                            if (needToCanncel(request)) {
                                System.out.println(id + "I am die!");
                                close(sc);
                                continue;
                            }
                            // 向客户端写一些信息
                            write(sc, "[" + getTime() + "] " + request + "\n");
                            // 重新设置key,需要获得锁。所以从阻塞唤醒selector
                            interestKey(key, SelectionKey.OP_READ);
                            // 如果不想继续,再这里关掉吧
                            // sc.socket().close();
                            // sc.close();
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }

        }
    }

    public static void main(String[] args) throws IOException {
        // System.out.println("server start.........");
        SimpleJettyServer server = new SimpleJettyServer(6789);
        server.listen(); // 服务器开始监听端口,提供服务
    }

}





代码说起来太枯燥,我们可以吧这个server看作一辆行驶中的公交车,车上的每个乘客看成是一个客户端,同时有三个工作人员:观察员,售票员,服务员,负责维护这辆车。

1.观察员(ConnectionHander):
他就负责站在车门口盯着车外,只要发现有想要人上车(客户端发起与服务端连接的请求),就负责把这个人带进汽车。如:有个叫张三的家伙被他带上了车,他会告诉张三:“喂,和车门口的其他人(已经在_changes_con队列中的请求)一起,在车门口排队站好了,别到处跑(把连接扔进队列_changes_con),一会儿售票员会来找你登记的(注册SelectionKey.OP_READ事件)”。然后观察员大吼一声告诉售票员,别发呆(selector.select(),这个方法这里是阻塞的,所以说成发呆)啦,快来登记(selector.wakeup())新乘客。接着观察员继续傻傻的等待(阻塞的等待),一心一意的!直到他发现又有人要上车。

2.售票员(RequestHander):
售票员往车门口看了一眼(遍历_changes_con队列)。果然有个叫张三的小伙子正在门口乖乖的等着,售票员走过去说:“喂,新上来的,来登记吧”,于是,售票员就吧张三的信息登记(sc.register(selector, SelectionKey.OP_READ, id))到她的小本子(selector.keys()这个小本子是售票员专有的,其他人无法操作)上。登记的信息有:到哪一站下车(SelectionKey.OP_READ),张三携带了什么物品(第三个参数)。登记完了张三并且告诉他:“你可以去睡觉了,到站了我会通知你的。”。OK,于是张三就找到个位置,呼呼大睡。不用担心坐过了站。
登记完了新乘客,售票员还得时刻注意车外,是不是到了有人要下车的地方。
很快,到了张三要下车的地方,售票员立刻把张三的名字写在一块黑板上(selector.selectedKeys()),当然黑板上可能不止张三一个人的名字,因为有可能其他人也在这个地方下车。然后叫醒要下车的人,并一个一个更改登记信息(key.interestOps(0)),然后告诉它们:“到后门口排队站好了(_changes_req.add(key)),一会服务员会来给你们开门,让你们下去的”。最后,服务员擦掉黑板上的信息(selector.selectedKeys().clear())

3.服务员(RequestExecutor):
服务员负责处理下车(客户端提交上来的请求)这件事情。她热情的接待了张三(这里是parseRequest方法),并认真的回答了张三提出的各种问题(处理客户端的请求,这里是write这个方法,通过socketchannel向客户端写一些信息)。然后为张三打开车门,让他下车(sc.socket().close(),sc.close())。当然,如果这个时候张三反悔了,说:“我不想下车了,我再做一会儿再下去”。服务员也会热情的帮更改登记信息(interestKey方法),并告诉售票员(selector.wakeup()),:“把张三这小子的信息改一下 key.interestOps(OP)”。OK,张三又可以继续到车上睡大觉了!如果到了他要下车的地方,售票员会再次执行之前相同的操作。

遇到的问题

问题1:处理完请求后,继续注册会卡住,也就是卡在sc.register(selector, SelectionKey.OP_READ, id)
原因:socketChannel注册到selector,需要获取selector的keys的锁,而此时,selector正阻塞在selector.select()方法上面。如果没有新的请求进来,会一直持有keys的锁。注册的时候拿不到keys的锁会阻塞卡住。
解决1:再注册之前手动调用一次selector.wakeup(),让selector和注册这2个线程去竟争
解决2:考虑倒注册操作的开销比较小,完全可以放到selecotr的同一个线程里面(jetty就是这么做的)。请求处理完后,直接调用key的interestOps方法,然后selector.wakeup(),第二次select操作的时候,会重新刷新keys。注册生效。

问题2:socketchannel.read(Bytebuffer)读取含有中文的buffer,然后用CharBuffer charBuffer = decoder.decode(bbuffer)操作会抛出异常 java.nio.charset.UnmappableCharacterException: Input length = 2
原因:客户端的输入编码不对。我用的terminal的编码为UTF-8
解决:设置一下即可。

问题3:输入中文过长的时候,还是偶尔会出现java.nio.charset.MalformedInputException: Input length = 1
原因:中文占用2个字节,如果只读到奇数个字节,解码就会出错。
解决:最好一次性读完所有的输入。所以用while循环来使劲读。每次读的放到数组里面,直到读到的为0,即认为没有信息了。然后解码。见parseRequest方法

2个疑问,期待你的解答!

1.实际上,由于网络原因parseRequest里面count = sc.read(bbuffer)可能读到的确实为0,但是却不是读完了所有信息,api上说读到的为-1即可认为已经读完,可经测试,即使读完了,也没有出现-1的情况,最后一次始终为0。谁能解释一下?

2.频繁调用wakeUp。看到wakeUp的原理,发现么次都是往连接里面写入一个字节,然后selector发现有io进来了,就解除阻塞。资料上说实际应用中这个开销还是不容忽视的。有木有好的办法减少调用?

3.RequestExecutor这个撕循环,会占用100%的cpu,咋办呢?

写了一个改进版,但是问题1让然无法解决,思考中。。。。http://aids198311.iteye.com/blog/1113471




分享到:
评论
1 楼 name327 2014-11-26  
迟到三年多的回答,大概都不需要了,可以给其他人看,我的回答只代表我自己的理解,可能有不正确的地方

1.实际上,由于网络原因parseRequest里面count = sc.read(bbuffer)可能读到的确实为0,但是却不是读完了所有信息,api上说读到的为-1即可认为已经读完,可经测试,即使读完了,也没有出现-1的情况,最后一次始终为0。谁能解释一下?

sc.read()方法api上有说,如果selector监听到sc上有读取事件发生,那么只能保证你在调用sc.read()方法时候,至少读取一个字节,并且该方法会立即返回(这里只考虑非阻塞模式)
至于读取到-1  只有在客户端close了连接,服务端会收到一个读取事件,这时候会读取到-1
博主说的已经读完了 , 要有标志判断完了, 或者报文头加包长度,猜测这里博主说的读完了,是根据人类常识理解客户端该发送的数据发送完毕了,但是这时候程序并不知道有没有发送完毕,需要告诉程序发送完毕的规则


2.频繁调用wakeUp。看到wakeUp的原理,发现么次都是往连接里面写入一个字节,然后selector发现有io进来了,就解除阻塞。资料上说实际应用中这个开销还是不容忽视的。有木有好的办法减少调用?

wakeup是当调用select()方法,阻塞时候,可以使该方法select()可以立即返回,等下次循环再次调用select() 就可以 处理cancel的selectkey,当然可能还可能有其他的操作

3.RequestExecutor这个撕循环,会占用100%的cpu,咋办呢?
cpu 100% 的原因可能是ConcurrentLinkedQueue的对象 _changes_req只是线程安全的,但不是 在调用 poll方法时候 如果没有有效的对象就调用wait()方法,等到有对象放入的时候调用notifyAll ,从而让线程休眠的操作导致的

相关推荐

    Jetty工作原理

    Jetty 目前的是一个比较被看好的 Servlet 引擎,它的架构比较简单,也是一个可扩展性和非常灵活的应用服务器,它有一个基本数据模型,这个数据模型就是 Handler,所有可以被扩展的组件都可以作为一个 Handler,添加...

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式.doc

    maven+jetty +ssh 项目例子

    maven+jetty 的ssh框架例子 直接运行 run:jetty

    jetty 例子, 就一个demo 还有jar

    jetty demo 简单 jetty eclipse demo .jetty 轻量级的web容器,在服务器开发中经常用到,在做服务器与服务器之间通讯的时候 。

    Jetty for Mac java demo 用java写的Mac环境下 jetty小例子

    自己用jetty写的java小例子,在mac 环境下,具体的解说在:https://www.cnblogs.com/aspirant/p/9445542.html

    jetty整合springmvc例子

    参考的官网的jetty例子,项目用的maven整合springmvc例子。亲测通过!

    Android-I-Jetty服务器部署例子代码

    Android-I-Jetty服务器部署例子代码,第一运行起来I-Jetty服务器以后,一定想要例子测试一下;

    gradle的jetty插件使用例子

    gradle的jetty插件使用例子,详细参考:http://blog.csdn.net/xiejx618/article/details/38307289

    jfinal-jetty+idea例子

    jfinal-jetty+idea例子

    jetty相关的全部jar包

    jetty-security-9.4.8.v20171121.jar,jetty-io-9.4.8.v20171121.jar,jetty-continuation-9.4.8.v20171121.jar,jetty-client-9.4.8.v20171121.jar,jetty-jmx-9.4.8.v20171121.jar,jetty-plus-9.4.8.v20171121....

    基于jetty8 websocket的例子

    基于jetty8的聊天工具,不需要服务器能够和数据库交互的的可以忽略dbUtil

    JettyServer 例子

    自己写得学习jetty server的一个例子

    jetty,tomcat原理

    这边主要是对tomcat和jetty两个容器的相关实现原理进行整理,主要资源来自网络,作者只是起到整理的作用,这一块也是希望分享出去,给即将面临面试的同学一些帮助,希望所有人都能入职心仪的公司。

    eclipse jetty插件run-jetty-run-1.3.3

    eclipse jetty插件,从...下载run-jetty-run.zip文件,解压后再编写个links文件丢到eclipse的dropins目录下即可,省去了使用eclipse update方式安装的麻烦。 link文件样例如: path=d:\\eclipse_plugins\\run-jetty-run

    jetty 8及依赖包

    jetty8以及依赖包,学习的好代码,包括NIO和servlet的实现等

    jetty的四个包

    jetty所需的四个基本包,包含jetty-6.1.8.jar jetty-util-6.1.8 servlet-apt-2.5-6.1.8.jar start-6.1.8.jar

    Jetty中文手册

    这个wiki提供jetty的入门教程、基础配置、功能特性、优化、安全、JavaEE、监控、常见问题、故障排除帮助等等。它包含教程、使用手册、视频、特征描述、参考资料以及常见问题。 Jetty文档 ---------------- 入门...

    用jetty8.0写的websocket实现的简单聊天程序

    用jetty8.0写的websocket实现的简单聊天程序,供大家一起共同分享学习。

    Jetty多版本软件包

    Jetty软件包内容: jetty-distribution-9.4.51.v20230217.tar.gz jetty-distribution-9.4.51.v20230217.zip jetty-home-10.0.15.tar.gz jetty-home-10.0.15.zip jetty-home-11.0.15.tar.gz jetty-home-11.0.15.zip ...

    Jetty cometd(Continuation)学习笔记

    Jetty 7是Jetty奔向Eclipse后发布的第一个版本,本次的Jetty 7 RC2带给了我们一个十分诱人的新特性-支持跨域名Ajax请求。众所周知因为安全的原因,多数浏览器都限制了Ajax跨域请求和javascript加载的时候只能是与...

Global site tag (gtag.js) - Google Analytics