`
zddava
  • 浏览: 240365 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Tomcat NIO源代码分析(二) -- Poller

阅读更多
接着上面的流程,现在请求到了Poller的#register()方法

	public void register(final NioChannel socket) {
		socket.setPoller(this);
		// KeyAttachment是对NioChannel信息的包装,同样是非GC
		KeyAttachment key = keyCache.poll();
		final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
		ka.reset(this, socket, getSocketProperties().getSoTimeout());
		ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
		
		// PollerEvent的初始化,非GC Again
		PollerEvent r = eventCache.poll();
		// this is what OP_REGISTER turns into.
		// 读取数据的事件
		ka.interestOps(SelectionKey.OP_READ);
		if (r == null)
			r = new PollerEvent(socket, ka, OP_REGISTER);
		else
			r.reset(socket, ka, OP_REGISTER);
		
		// 把事件加到Poller
		addEvent(r);
	}

	public void addEvent(Runnable event) {
		// 把事件加入到队列中
		events.offer(event);
		// ++wakeupCounter
		if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
	}


其实也挺好懂的,就是把NioChannel作为OP_REGISTER事件注册到Poller,这样在Poller的#run()方法中就可以对加入Poller的事件进行处理了

	public void run() {
		while (running) {
			try {
				while (paused && (!close)) {
					try {
						Thread.sleep(100);
					} catch (InterruptedException e) {
						// Ignore
					}
				}
				boolean hasEvents = false;

				hasEvents = (hasEvents | events());
				// Time to terminate?
				if (close) {
					timeout(0, false);
					break;
				}
				try {
					if (!close) {
						if (wakeupCounter.get() > 0) {
							// 立刻返回 I/O 就绪的那些通道的键
							keyCount = selector.selectNow();
						} else {
							keyCount = selector.keys().size();
							// 这里把wakeupCounter设成-1,在addEvent的时候就会唤醒selector
							wakeupCounter.set(-1);
							// 使用阻塞的方式
							keyCount = selector.select(selectorTimeout);
						}
						wakeupCounter.set(0);
					}
					if (close) {
						timeout(0, false);
						selector.close();
						break;
					}
				} catch (NullPointerException x) {
					// sun bug 5076772 on windows JDK 1.5
					if (log.isDebugEnabled())
						log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
					if (wakeupCounter == null || selector == null)
						throw x;
					continue;
				} catch (CancelledKeyException x) {
					// sun bug 5076772 on windows JDK 1.5
					if (log.isDebugEnabled())
						log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
					if (wakeupCounter == null || selector == null)
						throw x;
					continue;
				} catch (Throwable x) {
					ExceptionUtils.handleThrowable(x);
					log.error("", x);
					continue;
				}
				// either we timed out or we woke up, process events first
				if (keyCount == 0)
					hasEvents = (hasEvents | events());

				Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
						: null;
				// Walk through the collection of ready keys and dispatch
				// any active event.
				while (iterator != null && iterator.hasNext()) {
					SelectionKey sk = iterator.next();
					// 这里的KeyAttachment实在#register()方法中注册的
					KeyAttachment attachment = (KeyAttachment) sk.attachment();
					attachment.access();
					iterator.remove();
					// 继续流程
					processKey(sk, attachment);
				}// while

				// process timeouts
				timeout(keyCount, hasEvents);
				if (oomParachute > 0 && oomParachuteData == null)
					checkParachute();
			} catch (OutOfMemoryError oom) {
				try {
					oomParachuteData = null;
					releaseCaches();
					log.error("", oom);
				} catch (Throwable oomt) {
					try {
						System.err.println(oomParachuteMsg);
						oomt.printStackTrace();
					} catch (Throwable letsHopeWeDontGetHere) {
						ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
					}
				}
			}
		}// while
		synchronized (this) {
			this.notifyAll();
		}
		stopLatch.countDown();

	}


这个方法有2个方法需要关注一下:#events()和#processKey():

	public boolean events() {
		boolean result = false;
		// synchronized (events) {
		Runnable r = null;
		// 返回是事件队列中是否有事件
		result = (events.size() > 0);
		while ((r = events.poll()) != null) {
			try {
				// 执行KeyEvent的#run()
				r.run();
				if (r instanceof PollerEvent) {
					((PollerEvent) r).reset();
					// 对KeyEvent进行回收
					eventCache.offer((PollerEvent) r);
				}
			} catch (Throwable x) {
				log.error("", x);
			}
		}
		// events.clear();
		// }
		return result;
	}


这里执行了SocketChannel对应的KeyEvent的#run()方法,在这个方法里给SocketChannel注册了OP_READ:

	public void run() {
		if (interestOps == OP_REGISTER) {
			try {
				// 给SocketChannel注册OP_READ
				socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ,
						key);
			} catch (Exception x) {
				log.error("", x);
			}
		} else {
			// 这里应该是对comet进行支持的,暂时先不看

			......

		}// end if
	}// run


第二个是#processKey()方法,里边的很多流程我现在不是很关心,都略去了,

	protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
		boolean result = true;
		try {
			if (close) {
				cancelledKey(sk, SocketStatus.STOP, false);
			} else if (sk.isValid() && attachment != null) {
				attachment.access();// make sure we don't time out valid sockets
				sk.attach(attachment);// cant remember why this is here
				NioChannel channel = attachment.getChannel();
				if (sk.isReadable() || sk.isWritable()) {
					if (attachment.getSendfileData() != null) {
						processSendfile(sk, attachment, true, false);
					} else if (attachment.getComet()) {// 这里应该是对comet的支持
						......
					} else {
						// 这个分支是现在比较关心的
						if (isWorkerAvailable()) {// 这个好像还没实现
							// 这个#unreg()很巧妙,防止了通道对同一个事件不断select的问题
							unreg(sk, attachment, sk.readyOps());
							boolean close = (!processSocket(channel, null, true));
							if (close) {
								cancelledKey(sk, SocketStatus.DISCONNECT, false);
							}
						} else {
							result = false;
						}
					}
				}
			} else {
				// invalid key
				cancelledKey(sk, SocketStatus.ERROR, false);
			}
		} catch (CancelledKeyException ckx) {
			cancelledKey(sk, SocketStatus.ERROR, false);
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			log.error("", t);
		}
		return result;
	}

	protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
		reg(sk, attachment, sk.interestOps() & (~readyOps));
	}

	protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
		sk.interestOps(intops);
		attachment.interestOps(intops);
		attachment.setCometOps(intops);
	}


这里的#unreg()方法据我理解应该很巧妙的解决了重复的IO事件问题,我自己写的测试用的NIO代码里就会有这个问题。

这样,流程就来到了Poller最后的#processSocket()方法了:

	public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
		try {
			KeyAttachment attachment = (KeyAttachment) socket.getAttachment(false);
			attachment.setCometNotify(false); // will get reset upon next reg
			// 使用SocketProcessor
			SocketProcessor sc = processorCache.poll();
			if (sc == null)
				sc = new SocketProcessor(socket, status);
			else
				sc.reset(socket, status);
			if (dispatch && getExecutor() != null)// 如果配置了ThreadPoolExecutor,那么使用它来执行
				getExecutor().execute(sc);
			else
				sc.run();
		} catch (RejectedExecutionException rx) {
			log.warn("Socket processing request was rejected for:" + socket, rx);
			return false;
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			// This means we got an OOM or similar creating a thread, or that
			// the pool and its queue are full
			log.error(sm.getString("endpoint.process.fail"), t);
			return false;
		}
		return true;
	}


这里SocketProcessor的#run()方法就不列出了,里边最后会通过下面的语句将流程转到Http11NioProtocol类,其中的handler就是对Http11NioProtocol的引用:

        SocketState state = SocketState.OPEN;
        state = (status==null)?handler.process(socket):handler.event(socket,status);


最后,对Acceptor和Poller的处理过程做个小结,见下图:


  • 大小: 27.2 KB
1
0
分享到:
评论

相关推荐

    xnio-nio-3.8.4.Final-API文档-中英对照版.zip

    赠送源代码:xnio-nio-3.8.4.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.4.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.4.Final-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org....

    xnio-nio-3.8.0.Final-API文档-中文版.zip

    赠送源代码:xnio-nio-3.8.0.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.0.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.0.Final-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.jboss.xnio:...

    xnio-nio-3.8.4.Final-API文档-中文版.zip

    赠送源代码:xnio-nio-3.8.4.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.4.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.4.Final-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.jboss.xnio:...

    xnio-nio-3.8.0.Final-API文档-中英对照版.zip

    赠送源代码:xnio-nio-3.8.0.Final-sources.jar; 赠送Maven依赖信息文件:xnio-nio-3.8.0.Final.pom; 包含翻译后的API文档:xnio-nio-3.8.0.Final-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org....

    httpcore-nio-4.4.15-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.15-sources.jar 包含翻译后的API文档:httpcore-nio-4.4.15-javadoc-API文档-中文(简体)版.zip 对应Maven信息:groupId:org.apache.httpcomponents,artifactId:httpcore-nio,...

    httpcore-nio-4.4.6-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.6-sources.jar 包含翻译后的API文档:httpcore-nio-4.4.6-javadoc-API文档-中文(简体)版.zip 对应Maven信息:groupId:org.apache.httpcomponents,artifactId:httpcore-nio,...

    httpcore-nio-4.4.10-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.10-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.10.pom; 包含翻译后的API文档:httpcore-nio-4.4.10-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org....

    httpcore-nio-4.4.15-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.15-sources.jar 包含翻译后的API文档:httpcore-nio-4.4.15-javadoc-API文档-中文(简体)-英语-对照版.zip 对应Maven信息:groupId:org.apache.httpcomponents,artifactId:...

    手写 tomcat nio

    手写 tomcat nio http://knight-black-bob.iteye.com/blog/2408450

    httpcore-nio-4.4.10-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.10-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.10.pom; 包含翻译后的API文档:httpcore-nio-4.4.10-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache....

    httpcore-nio-4.4.5-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.5-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.5.pom; 包含翻译后的API文档:httpcore-nio-4.4.5-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache....

    httpcore-nio-4.4.14-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.14-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.14.pom; 包含翻译后的API文档:httpcore-nio-4.4.14-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache....

    httpcore-nio-4.4.12-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.12-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.12.pom; 包含翻译后的API文档:httpcore-nio-4.4.12-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache....

    httpcore-nio-4.4.6-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.6-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.6.pom; 包含翻译后的API文档:httpcore-nio-4.4.6-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache....

    nio-multipart-parser-1.1.0.jar

    java运行依赖jar包

    httpcore-nio-4.4.4-API文档-中文版.zip

    赠送源代码:httpcore-nio-4.4.4-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.4.pom; 包含翻译后的API文档:httpcore-nio-4.4.4-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache....

    httpcore-nio-4.4.12-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.12-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.12.pom; 包含翻译后的API文档:httpcore-nio-4.4.12-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org....

    httpcore-nio-4.4.4-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.4-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.4.pom; 包含翻译后的API文档:httpcore-nio-4.4.4-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache....

    httpcore-nio-4.4.5-API文档-中英对照版.zip

    赠送源代码:httpcore-nio-4.4.5-sources.jar; 赠送Maven依赖信息文件:httpcore-nio-4.4.5.pom; 包含翻译后的API文档:httpcore-nio-4.4.5-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache....

Global site tag (gtag.js) - Google Analytics