`
Donald_Draper
  • 浏览: 954452 次
社区版块
存档分类
最新评论

netty 事件执行器组和事件执行器定义及抽象实现

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
netty 默认Channel管道线-通道处理器移除与替换:http://donald-draper.iteye.com/blog/2388793
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
netty 通道处理器上下文:http://donald-draper.iteye.com/blog/2389299
引言:
     在前面的文章中,当IO事件发生,Channel管道线处理相关事件的方法,如果管道线事件执行器处理当前事件循环组中,则直接执行,否则从事件执行器组映射关系childExecutors(Map<EventExecutorGroup, EventExecutor>)中获取事件执行器对应的事件执行器,并将IO事件的相关操作委托给事件执行器,这个事件执行器就是构造通道处理器上下文时的事件执行器executor(EventExecutor)。
从今天开始我们来看一下事件执行器组,事件执行器的还以及作用:
在前面的实例中,创建netty服务端以下一段代码:
/*
 * EventLoopGroup(多线程事件loop),处理IO操作,这里我们用了两个事件loop
 * 第一个用于处理器监听连接请求,第二个用于数据的传输;
 * 具体线程是多少依赖于事件loop的具体实现
 * */
 EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 	//ServerBootstrap,用于配置服务端,一般为ServerSocket通道
     ServerBootstrap serverBoot = new ServerBootstrap(); 
     serverBoot.group(bossGroup, workerGroup)
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
          @Override
          public void initChannel(SocketChannel ch) throws Exception {
         	//添加通道处理器到通道关联的管道,准确的中文翻译为管道线, 此管道线与Mina中过滤链十分相似,
         	//ChannelInitializer用于配置通道的管道线,ChannelPipeline
         	 ChannelPipeline pipeline = ch.pipeline();
              if (sslCtx != null) {
             	 pipeline.addLast(sslCtx.newHandler(ch.alloc()));
              }
              pipeline.addLast(new LoggingHandler(LogLevel.INFO));
              pipeline.addLast(new EchoServerHandler());
          }
      })
      .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、
      .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道
     InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
     // 绑定地址,开始监听
     ChannelFuture f = serverBoot.bind(inetSocketAddress).sync();
     log.info("=========Server is start=========");
     //等待,直到ServerSocket关闭
     f.channel().closeFuture().sync();
 } finally {
     workerGroup.shutdownGracefully();
     bossGroup.shutdownGracefully();
 }

其中用到了NioEventLoopGroup;
在客户端:
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
	//Bootstrap,用于配置客户端,一般为Socket通道
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup)
     .channel(NioSocketChannel.class)
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
        	 //添加安全套接字处理器和通道处理器到
             ChannelPipeline pipeline = ch.pipeline();
             if (sslCtx != null) {
            	 pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port));
             }
             pipeline.addLast(new LoggingHandler(LogLevel.INFO));
             pipeline.addLast(new EchoClientHandler());
         }
     });
    InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
    //连接socket地址
    ChannelFuture f = bootstrap.connect(inetSocketAddress).sync();
    log.info("=========Client is start=========");
    //等待,直到连接关闭
    f.channel().closeFuture().sync();
} finally {
	workerGroup.shutdownGracefully();
}

同样也用到了NioEventLoopGroup,
我们来看一下事件循环组EventLoopGroup和事件执行器组EventExecutorGroup及事件执行器EventExecutor的关系;
/**
 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
 Nio事件循环组NioEventLoopGroup为多线程事件循环组的事件,主要用于基于通道的Nio选择器相关操作。
 */
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

/**
 * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 多线程事件循环组MultithreadEventLoopGroup为事件循环组的实现,可以在同一时间多线程处理任务。
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {


/**
 * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 多线程事件执行器组MultithreadEventExecutorGroup可以在同一时间多线程处理任务。
 */
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {


/**
 * Abstract base class for {@link EventExecutorGroup} implementations.
 事件循环中的抽象实现
 */
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

/**
 * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
 * via its {@link #next()} method. Besides this, it is also responsible for handling their
 * life-cycle and allows shutting them down in a global fashion.
 事件执行器组通道next方法提供事件执行器。除此之外,负责他们的生命循环,并允许以全局的方式关闭
 *
 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

调度执行器ScheduledExecutorService为JUC包中的执行器服务,用迭代器Iterable<EventExecutor>管理组内的
事件执行器。



/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
事件执行器EventExecutor是一个特殊的事件执行器组,如果线程在事件循环中执行,事件执行器可以处理
相关的操作。除此之外,拓展了事件执行器组的相关方法,可以用一般的方式访问事件执行器组的相关方法。
 *
 */
public interface EventExecutor extends EventExecutorGroup {


再来看事件循环组的另一个分支EventLoopGroup

/**
 * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get
 * processed for later selection during the event loop.
 事件循环组为一个特殊的事件执行器组,可以注册通道,以便在事件循环中,被后面的选择操作处理器。
 *
 */
public interface EventLoopGroup extends EventExecutorGroup {


从上面可以看出事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。


package io.netty.util.concurrent;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
 * via its {@link #next()} method. Besides this, it is also responsible for handling their
 * life-cycle and allows shutting them down in a global fashion.
 事件执行器组通道next方法提供事件执行器。除此之外,负责他们的生命循环,并允许以全局的方式关闭
 *
 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

    /**
     * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup}
     * are being {@linkplain #shutdownGracefully() shut down gracefully} or was {@linkplain #isShutdown() shut down}.
     当且仅当事件执行器组关联的事件执行器被#shutdownGracefully或#isShutdown关闭时,此方法返回true
     */
    boolean isShuttingDown();

    /**
     * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values.
     *#shutdownGracefully超时方法的快捷方式,超时时间为默认值
     * @return the {@link #terminationFuture()}
     返回的为#terminationFuture方法的异步任务
     */
    Future<?> shutdownGracefully();

    /**
     * Signals this executor that the caller wants the executor to be shut down.  Once this method is called,
     * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
     * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
     * (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period,
     * it is guaranteed to be accepted and the quiet period will start over.
     通知执行器,调用希望关闭执行器。一旦方法被调用,#isShuttingDown方法返回true,执行器准备关闭。不像#shutdown方法,
     graceful方式的关闭,确保在关闭前,没有任务在默认间隔内提交到执行器。如果任务在默认间隔内被提交,执行器必须能够保证在默认间隔内任务可以被接收,猜测应该是任务可以在默认间隔内执行完,所以接收。
     *
     * @param quietPeriod the quiet period as described in the documentation
     默认间隔
     * @param timeout     the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
     *                    regardless if a task was submitted during the quiet period
     不管任务在默认间隔内被提交,执行调用#shutdown方法关闭执行器,最大超时等待时间
     * @param unit        the unit of, {@code quietPeriod} and {@code timeout}
     *默认间隔和超时时间单元
     * @return the {@link #terminationFuture()}
     */
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

    /**
     * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
     * {@link EventExecutorGroup} have been terminated.
     当事件执行器管理的所有事件执行器terminated时,异步任务结果Future将会被通知
     */
    Future<?> terminationFuture();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已弃用,被#shutdownGracefully方法代替
     */
    @Override
    @Deprecated
    void shutdown();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已弃用,被#shutdownGracefully方法代替
     */
    @Override
    @Deprecated
    List<Runnable> shutdownNow();

    /**
     * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
     返回事件执行器组管理的下一个事件执行器
     */
    EventExecutor next();
   //返回事件执行器组管理的事件执行器迭代器
    @Override
    Iterator<EventExecutor> iterator();
   //下面方法与JUC调度执行器服务相同,就不说了,JUC分类文章中已说
    @Override
    Future<?> submit(Runnable task);

    @Override
    <T> Future<T> submit(Runnable task, T result);

    @Override
    <T> Future<T> submit(Callable<T> task);

    @Override
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    @Override
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

从上面可以看出,事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取
事件执行器组管理的事件执行器和执行任务线程方法。
再来看事件执行器:
package io.netty.util.concurrent;

/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
 *
 */
public interface EventExecutor extends EventExecutorGroup {

    /**
     * Returns a reference to itself.
     返回事件执行器组的下一个事件执行器
     */
    @Override
    EventExecutor next();

    /**
     * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
     返回所属的事件执行器组
     */
    EventExecutorGroup parent();

    /**
     * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
     判断当前线程是否在事件循环中
     */
    boolean inEventLoop();

    /**
     * Return {@code true} if the given {@link Thread} is executed in the event loop,
     * {@code false} otherwise.
     如果指定的线程在当前事件循环中执行,则返回true
     */
    boolean inEventLoop(Thread thread);

    /**
     * Return a new {@link Promise}.
     创建一个可写的异步任务结果
     */
    <V> Promise<V> newPromise();

    /**
     * Create a new {@link ProgressivePromise}.
     创建一个可写的异步任务解读结果
     */
    <V> ProgressivePromise<V> newProgressivePromise();

    /**
     * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
     * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     创建一个已经标记成功的异步任务结果。所以任务的Future#isSuccess方法,返回true。所有监控任务的监听器将会被
     直接通知。所有阻塞方法调用,将会无阻塞直接返回。
     */
    <V> Future<V> newSucceededFuture(V result);

    /**
     * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
     * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     创建一个已经标记失败的异步任务结果。所以任务的Future#isSuccess方法,返回false。所有监控任务的监听器将会被
     直接通知。所有阻塞方法调用,将会无阻塞直接返回。
     */
    <V> Future<V> newFailedFuture(Throwable cause);
}

从上面可以看出事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的
异步结果。

再来看一下抽象事件执行器组AbstractEventExecutorGroup:
package io.netty.util.concurrent;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static io.netty.util.concurrent.AbstractEventExecutor.*;


/**
 * Abstract base class for {@link EventExecutorGroup} implementations.
 */
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
    //所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器的下一个事件执行器相应方法执行。
    @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return next().submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return next().submit(task);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return next().schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return next().scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
   //graceful方式关闭事件执行器组
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已启用
     */
    @Override
    @Deprecated
    public abstract void shutdown();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已启用
     */
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return next().invokeAll(tasks);
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return next().invokeAll(tasks, timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return next().invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return next().invokeAny(tasks, timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }
}

我们来看一下抽象事件执行器的关闭方法
 //graceful方式关闭事件执行器组
@Override
public Future<?> shutdownGracefully() {
    return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}


来看一下这些默认值在哪里定义:
/**
 * Abstract base class for {@link EventExecutor} implementations.
 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

    private final EventExecutorGroup parent;
    private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
    ...
}



实际定义在抽象事件执行器中,默认关闭间隔为2s,超时时间为25s。

抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s。

再来看一下抽象事件执行器AbstractEventExecutor,
package io.netty.util.concurrent;

import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

/**
 * Abstract base class for {@link EventExecutor} implementations.
 继承了抽象执行器服务AbstractExecutorService
 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;//关闭执行器默认间隔
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;//关闭执行器超时等待时间

    private final EventExecutorGroup parent;//所属事件执行器组
    //当前事件执行器单例集
    private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);

    protected AbstractEventExecutor() {
        this(null);
    }

    protected AbstractEventExecutor(EventExecutorGroup parent) {
        this.parent = parent;
    }

    @Override
    public EventExecutorGroup parent() {
        return parent;
    }
    //next方法返回的为自己
    @Override
    public EventExecutor next() {
        return this;
    }
    //判断当前方法是否在事件循环中
    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

    @Override
    public Iterator<EventExecutor> iterator() {
        return selfCollection.iterator();
    } 
    //关闭执行器
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     */
    @Override
    @Deprecated
    public abstract void shutdown();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     */
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }
    //创建异步任务结果
    @Override
    public <V> Promise<V> newPromise() {
        return new DefaultPromise<V>(this);
    }

    @Override
    public <V> ProgressivePromise<V> newProgressivePromise() {
        return new DefaultProgressivePromise<V>(this);
    }

    @Override
    public <V> Future<V> newSucceededFuture(V result) {
        return new SucceededFuture<V>(this, result);
    }

    @Override
    public <V> Future<V> newFailedFuture(Throwable cause) {
        return new FailedFuture<V>(this, cause);
    }
    //提交任务线程,直接委托给父类抽象执行器服务
    @Override
    public Future<?> submit(Runnable task) {
        return (Future<?>) super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return (Future<T>) super.submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return (Future<T>) super.submit(task);
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new PromiseTask<T>(this, runnable, value);
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new PromiseTask<T>(this, callable);
    }
    //从下面来看,不支持延时调度的周期间歇性调度任务线程
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay,
                                       TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    /**
     * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
     安全地执行给定任务线程,捕捉抛出的异常
     */
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }
}

从上面来看,抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。

总结:


     事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。
    事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。
    事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。
    抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。
    抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics