`
Donald_Draper
  • 浏览: 951192 次
社区版块
存档分类
最新评论

netty 单线程事件执行器初始化

阅读更多
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
引言:
前面一篇文章我们看了抽象调度事件执行器,来回顾一下:
      调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。
      抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;移除调度任务的思想与调度任务相同,只不过执行移除操作。

今天我们来看单线程事件执行器:
package io.netty.util.concurrent;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
 * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    //最大执行任数,最小为16
    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
    //事件执行器状态
    private static final int ST_NOT_STARTED = 1;//就绪
    private static final int ST_STARTED = 2;//开始
    private static final int ST_SHUTTING_DOWN = 3;//正在关闭
    private static final int ST_SHUTDOWN = 4;//已关闭
    private static final int ST_TERMINATED = 5;//终止
    //唤醒任务
    private static final Runnable WAKEUP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };
    //空任务
    private static final Runnable NOOP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };
    //事件执行器状态
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    //线程属性
    private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(
                    SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
   //任务队列
    private final Queue<Runnable> taskQueue;

    private volatile Thread thread;//当前事件执行器线程
    @SuppressWarnings("unused")//线程属性
    private volatile ThreadProperties threadProperties;
    private final Executor executor;
    private volatile boolean interrupted;//是否中断

    private final Semaphore threadLock = new Semaphore(0);//事件执行器关闭信号量
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();//关闭Hooks任务
    private final boolean addTaskWakesUp;
    private final int maxPendingTasks;//最大执行器任务
    private final RejectedExecutionHandler rejectedExecutionHandler;//任务拒绝策略

    private long lastExecutionTime;//上次执行器时间

    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;

    private volatile long gracefulShutdownQuietPeriod;//关闭间隔QuietPeriod
    private volatile long gracefulShutdownTimeout;//关闭超时时间
    private long gracefulShutdownStartTime;//关闭开始时间
    //终止异步任务结果
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
}

从上面可以看出单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;

再来看构造
/**
 * Create a new instance
 *
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 父事件执行器
 * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
 线程工厂
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 添加任务时,是否唤醒执行器
 */
protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
}

/**
 * Create a new instance
 *
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
 最大任务数
 * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
 拒绝任务策略
 */
protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}

/**
 * Create a new instance
 *
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param executor          the {@link Executor} which will be used for executing
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 */
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
    this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
}

/**
 * Create a new instance
 *
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param executor          the {@link Executor} which will be used for executing
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
 * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
 */
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    //创建任务度列
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

/**
 * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
 * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
 * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
 * implementation that does not support blocking operations at all.
 创建一个队列,存放任务,以待执行,默认实现为LinkedBlockingQueue
 */
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}


从上面来看,单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。



总结:


      单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
    单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略,默认拒绝策略为直接抛出拒绝执行器异常。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。


附:
在单线程事件执行器的变量声明和构造中有几点,需要我们关注,分别如下:
1.线程工厂
protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}

//ThreadPerTaskExecutor 线程执行器
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;//线程工厂

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }
    //执行任务
    @Override
    public void execute(Runnable command) {
        //将任务包装成
        threadFactory.newThread(command).start();
    }
}

//默认线程工厂DefaultThreadFactory
package io.netty.util.concurrent;

import io.netty.util.internal.StringUtil;

import java.util.Locale;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * A {@link ThreadFactory} implementation with a simple naming rule.
 */
public class DefaultThreadFactory implements ThreadFactory {

    private static final AtomicInteger poolId = new AtomicInteger();//线程池id生成器

    private final AtomicInteger nextId = new AtomicInteger();//线程id生成器
    private final String prefix;//线程名前缀
    private final boolean daemon;//是否为守候模式
    private final int priority;//优先级
    protected final ThreadGroup threadGroup;//线程组

    public DefaultThreadFactory(Class<?> poolType) {
        this(poolType, false, Thread.NORM_PRIORITY);
    }

    public DefaultThreadFactory(String poolName) {
        this(poolName, false, Thread.NORM_PRIORITY);
    }

    public DefaultThreadFactory(Class<?> poolType, boolean daemon) {
        this(poolType, daemon, Thread.NORM_PRIORITY);
    }

    public DefaultThreadFactory(String poolName, boolean daemon) {
        this(poolName, daemon, Thread.NORM_PRIORITY);
    }

    public DefaultThreadFactory(Class<?> poolType, int priority) {
        this(poolType, false, priority);
    }

    public DefaultThreadFactory(String poolName, int priority) {
        this(poolName, false, priority);
    }

    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
        this(toPoolName(poolType), daemon, priority);
    }

    public static String toPoolName(Class<?> poolType) {
        if (poolType == null) {
            throw new NullPointerException("poolType");
        }

        String poolName = StringUtil.simpleClassName(poolType);
        switch (poolName.length()) {
            case 0:
                return "unknown";
            case 1:
                return poolName.toLowerCase(Locale.US);
            default:
                if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
                    return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
                } else {
                    return poolName;
                }
        }
    }

    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
        if (poolName == null) {
            throw new NullPointerException("poolName");
        }
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(
                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
        }

        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
        this.daemon = daemon;
        this.priority = priority;
        this.threadGroup = threadGroup;
    }
    public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
        //如果系统安全管理器没有配置,则为线程当前所属的组,否则获取安全管理器配置的线程组
        this(poolName, daemon, priority, System.getSecurityManager() == null ?
                Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
    }
    //创建任务线程
    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }
    //创建FastThreadLocalThread
    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
    //任务包装线程
    private static final class DefaultRunnableDecorator implements Runnable {

        private final Runnable r;

        DefaultRunnableDecorator(Runnable r) {
            this.r = r;
        }

        @Override
        public void run() {
            try {
                r.run();
            } finally {
	       //释放线程的所有本地变量
                FastThreadLocal.removeAll();
            }
        }
    }
}


2.
//终止异步任务结果
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

//全局事务执行器 GlobalEventExecutor
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it when there is no
 * task pending in the task queue for 1 second.  Please note it is not scalable to schedule large number of tasks to
 * this executor; use a dedicated executor.
 GlobalEventExecutor为线程单例事件执行器。自动开始任务,当在一秒内没有任务添加到队列时,停止。
 注意此执行不适合调度大量的任务。调度大量任务,可以用专用的执行器
 */
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
    //调度间隔
    private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
   //全局事件执行器实例
    public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
    //任务队列
    final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
    //调度任务
    final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
            this, Executors.<Void>callable(new Runnable() {
        @Override
        public void run() {
            // NOOP
        }
    }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);

    // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
    // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
    // be sticky about its thread group
    // visible for testing
    线程工厂为默认的线程工程,在第二点中我们简单说一下
    final ThreadFactory threadFactory =
            new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
    //任务线程
    private final TaskRunner taskRunner = new TaskRunner();
    //事件执行器状态
    private final AtomicBoolean started = new AtomicBoolean();
    volatile Thread thread;
    //异步终止任务线程
    private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());

    private GlobalEventExecutor() {
        scheduledTaskQueue().add(quietPeriodTask);
    }

    /**
     * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
     *从任务队列获取任务,任务队列为空,则阻塞,直到有任务
     * @return {@code null} if the executor thread has been interrupted or waken up.
     */
    Runnable takeTask() {
        BlockingQueue<Runnable> taskQueue = this.taskQueue;
        for (;;) {
	    //获取调度任务
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            if (scheduledTask == null) {
                Runnable task = null;
                try {
		    //如果调度任务队列为空,则从任务队列中take一个任务
                    task = taskQueue.take();
                } catch (InterruptedException e) {
                    // Ignore
                }
                return task;
            } else {
	        //否则获取调度任务延时
                long delayNanos = scheduledTask.delayNanos();
                Runnable task;
                if (delayNanos > 0) {
                    try {
		        //如果延时大于0,则从任务队列延时poll一个任务
                        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        // Waken up.
                        return null;
                    }
                } else {
		    //否者直接从队列拉取一个任务
                    task = taskQueue.poll();
                }

                if (task == null) {
		    //从调度任务队列抓取调度任务,添加到任务队列
                    fetchFromScheduledTaskQueue();
                    task = taskQueue.poll();
                }

                if (task != null) {
                    return task;
                }
            }
        }
    }
    //从调度任务队列抓取调度任务,添加到任务队列
    private void fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
	//拉取调度任务
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
	    //将调度任务添加到任务队列
            taskQueue.add(scheduledTask);
            scheduledTask = pollScheduledTask(nanoTime);
        }
    }

    /**
     * Return the number of tasks that are pending for processing.
     *返回任务队列中的任务数量
     * [b]Be aware that this operation may be expensive as it depends on the internal implementation of the
     * SingleThreadEventExecutor. So use it was care![/b]
     */
    public int pendingTasks() {
        return taskQueue.size();
    }

    /**
     * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
     * before.
     添加任务到任务队列,如果事件执行器已关闭,则抛出拒绝执行器异常
     */
    private void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        taskQueue.add(task);
    }
   //判断线程是否在当前事件循环中
    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
   //关闭事件执行器
    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        return terminationFuture();
    }
   //返回关闭异步任务结果
    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }
     //关闭事件执行器
    @Override 
    @Deprecated
    public void shutdown() {
        throw new UnsupportedOperationException();
    }
    //下面3个方法为判断事件执行器是否关闭,正在关闭,已终止
    @Override
    public boolean isShuttingDown() {
        return false;
    }

    @Override
    public boolean isShutdown() {
        return false;
    }

    @Override
    public boolean isTerminated() {
        return false;
    }
    //不支持超时终止
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) {
        return false;
    }

    /**
     * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
     * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
     * when you want to ensure that the worker thread is terminated [b]after[/b] your application is shut
     * down and there's no chance of submitting a new task afterwards.
     等待执行器的工作线程没有任务可执行,终止。当任务提交到执行器时,新建一个工作线程,当你想保证在工作线程终止前,没有任务提交到
     执行器,这个方法非常有用。
     *
     * @return {@code true} if and only if the worker thread has been terminated
     */
    public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        final Thread thread = this.thread;
        if (thread == null) {
            throw new IllegalStateException("thread was not started");
        }
	//  * Waits at most {@code millis} milliseconds for this thread to
        //  * die. A timeout of {@code 0} means to wait forever.
        thread.join(unit.toMillis(timeout));
        return !thread.isAlive();
    }
   //执行任务
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
       //添加任务到任务队列
        addTask(task);
        if (!inEventLoop()) {
	    //如果任务线程不在当前事件循环中,则直接执行线程
            startThread();
        }
    }
   //根据任务
    private void startThread() {
        //事件执行器已开始
        if (started.compareAndSet(false, true)) {
	   //创建任务执行线程
            Thread t = threadFactory.newThread(taskRunner);
            // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
            // an assert error.
            // See https://github.com/netty/netty/issues/4357
            thread = t;
	    //直接启动任务执行线程
            t.start();
        }
    }
    //任务线程
    final class TaskRunner implements Runnable {
        @Override
        public void run() {
            for (;;) {
	       //从任务队列take任务
                Runnable task = takeTask();
                if (task != null) {
                    try {
		        //任务不为空,则执行任务
                        task.run();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from the global event executor: ", t);
                    }
                    //如果任务非quietPeriodTask,跳出当前循环
                    if (task != quietPeriodTask) {
                        continue;
                    }
                }
               //获取调度任务队列
                Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
                // Terminate if there is no task in the queue (except the noop task).
                if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
		    //如果任务队列为空且调度任务队列为空或调度任务队列只有一个待调度的任务
                    // Mark the current thread as stopped.
                    // The following CAS must always success and must be uncontended,
                    // because only one thread should be running at the same time.
		    //设置事件执行器已关闭
                    boolean stopped = started.compareAndSet(true, false);
                    assert stopped;

                    // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
                    if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                        // A) No new task was added and thus there's nothing to handle
                        //    -> safe to terminate because there's nothing left to do
                        // B) A new thread started and handled all the new tasks.
                        //    -> safe to terminate the new thread will take care the rest
                        break;
                    }

                    // There are pending tasks added again.
                    if (!started.compareAndSet(false, true)) {
                        // startThread() started a new thread and set 'started' to true.
                        // -> terminate this thread so that the new thread reads from taskQueue exclusively.
			//如果线程不能重新开始,则终止线程,这样做的目的是保证工作线程互质地,从任务队列消费任务。
                        break;
                    }

                    // New tasks were added, but this worker was faster to set 'started' to true.
                    // i.e. a new worker thread was not started by startThread().
                    // -> keep this thread alive to handle the newly added entries.
                }
            }
        }
    }
}

从上面可以看出全局事件执行器,执行任务时,首先添加任务到任务队列,如果线程在当前事务循环中,则等待执行,否则启动一个工作线程TaskRunner,不断从任务队列中take任务执行。如果任务队列为空,且调度任务队列为空或调度任务队列只有一个待调度的任务,则关闭事件执行器,这样做的目的是保证工作线程互质地从任务队列消费任务。


3.
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
    this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
}

//任务拒绝执行策略RejectedExecutionHandlers
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * Expose helper methods which create different {@link RejectedExecutionHandler}s.
 */
public final class RejectedExecutionHandlers {
    //直接抛出异常
    private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
        @Override
        public void rejected(Runnable task, SingleThreadEventExecutor executor) {
            throw new RejectedExecutionException();
        }
    };

    private RejectedExecutionHandlers() { }

    /**
     * Returns a {@link RejectedExecutionHandler} that will always just throw a {@link RejectedExecutionException}.
     */
    public static RejectedExecutionHandler reject() {
        return REJECT;
    }

    /**
     * Tries to backoff when the task can not be added due restrictions for an configured amount of time. This
     * is only done if the task was added from outside of the event loop which means
     * {@link EventExecutor#inEventLoop()} returns {@code false}.
     当事件执行器不在当前事件循环的处理策略,
     */
    public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {
        ObjectUtil.checkPositive(retries, "retries");
        final long backOffNanos = unit.toNanos(backoffAmount);
        return new RejectedExecutionHandler() {
            @Override
            public void rejected(Runnable task, SingleThreadEventExecutor executor) {
	        //如果执行器不在当前事件循环中,则尝试唤醒执行器,并清空任务队列,然后添加任务到执行器任务队列
                if (!executor.inEventLoop()) {
                    for (int i = 0; i < retries; i++) {
                        // Try to wake up the executor so it will empty its task queue.
			//唤醒执行器,清空任务队列
                        executor.wakeup(false);
                        
                        LockSupport.parkNanos(backOffNanos);
			//添加任务到执行器任务队列
                        if (executor.offerTask(task)) {
                            return;
                        }
                    }
                }
                // Either we tried to add the task from within the EventLoop or we was not able to add it even with
                // backoff.
                throw new RejectedExecutionException();
            }
        };
    }
}

//SingleThreadEventExecutor
   protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop || state == ST_SHUTTING_DOWN) {
            // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
            // is already something in the queue.
	    //添加唤醒任务线程到队列
            taskQueue.offer(WAKEUP_TASK);
        }
    }

//RejectedExecutionHandler
package io.netty.util.concurrent;

/**
 * Similar to {@link java.util.concurrent.RejectedExecutionHandler} but specific to {@link SingleThreadEventExecutor}.
 */
public interface RejectedExecutionHandler {

    /**
     * Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity
     * restrictions.
     */
    void rejected(Runnable task, SingleThreadEventExecutor executor);
}

4.线程属性
@SuppressWarnings("unused")//线程属性
private volatile ThreadProperties threadProperties;


//线程属性ThreadProperties,一看就明白,就不说了
package io.netty.util.concurrent;

/**
 * Expose details for a {@link Thread}.
 */
public interface ThreadProperties {
    /**
     * @see Thread#getState()
     */
    Thread.State state();
    /**
     * @see Thread#getPriority()
     */
    int priority();

    /**
     * @see Thread#isInterrupted()
     */
    boolean isInterrupted();
    /**
     * @see Thread#isDaemon()
     */
    boolean isDaemon();
    /**
     * @see Thread#getName()
     */
    String name();
    /**
     * @see Thread#getId()
     */
    long id();
    /**
     * @see Thread#getStackTrace()
     */
    StackTraceElement[] stackTrace();
    /**
     * @see Thread#isAlive()
     */
    boolean isAlive();
}


来看单线程执行器内部的实现:
private static final class DefaultThreadProperties implements ThreadProperties {
        private final Thread t;
        DefaultThreadProperties(Thread t) {
            this.t = t;
        }
        @Override
        public State state() {
            return t.getState();
        }
        @Override
        public int priority() {
            return t.getPriority();
        }
        @Override
        public boolean isInterrupted() {
            return t.isInterrupted();
        }
        @Override
        public boolean isDaemon() {
            return t.isDaemon();
        }
        @Override
        public String name() {
            return t.getName();
        }
        @Override
        public long id() {
            return t.getId();
        }
        @Override
        public StackTraceElement[] stackTrace() {
            return t.getStackTrace();
        }
        @Override
        public boolean isAlive() {
            return t.isAlive();
        }
    }
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics