`

深入JDK源代码之定时操作Timer类和TimerTask类实现

    博客分类:
  • java
阅读更多
    Timer类是一种线程设施,可以用来实现某一个时间或某一段时间后安排某一个任务执行一次或定期重复执行。该功能和TimerTask配合使用。TimerTask类用于实现由Timer安排的一次或重复执行的某个任务。每一个Timer对象对应的是一个线程,因此计时器所执行的任务应该迅速完成,否则会延迟后续的任务执行。
  一、 深入JDK源代码TimerTask类,发现这个类是个抽象类比较简单,有四个常量表示定时器任务的状态,还有一个Object类型lock对象,相当一把锁,控制线程对定时器任务状态的同步访问。
  nextExecutionTime 这个成员变量用到记录该任务下次执行时间, 其格式和System.currentTimeMillis()一致.
这个值是作为任务队列中任务排序的依据. 任务调试者执行每个任务前会对这个值作处理,重新计算下一次任务执行时间,并为这个变量赋值.
  period 用来描述任务的执行方式: 0表示不重复执行的任务. 正数表示固定速率执行的任务. 负数表示固定延迟执行的任务.
(固定速率: 不考虑该任务上一次执行情况,始终从开始时间算起的每period执行下一次.   固定延迟: 考虑该任务一次执行情况,在上一次执行后period执行下一次).
代码如下:
public abstract class TimerTask implements Runnable {
	//这个对象是用来控制访问TimerTask内部构件。锁机制
    final Object lock = new Object();
    //定时器任务的状态
    int state = VIRGIN;
    //定时器任务默认的状态,表示还没有被安排
    static final int VIRGIN = 0;
    //表示定时器任务被安排了
    static final int SCHEDULED   = 1;
    //表示定时器任务执行
    static final int EXECUTED    = 2;
    //表示定时器任务取消
    static final int CANCELLED   = 3;
    //下次执行任务时间
    long nextExecutionTime;
    long period = 0;

    protected TimerTask() {
    }
   // 此计时器任务要执行的操作。
    public abstract void run();

    // 取消此计时器任务。
    public boolean cancel() {
        synchronized(lock) {
            boolean result = (state == SCHEDULED);
            state = CANCELLED;
            return result;
        }
    }
    // 返回此任务最近实际 执行的已安排 执行时间。
    public long scheduledExecutionTime() {
        synchronized(lock) {
            return (period < 0 ? nextExecutionTime + period
                               : nextExecutionTime - period);
        }
    }
}

二、深入JDK源代码之Timer类,Timer中最主要由三个部分组成: 任务 TimerTask 、  任务队列: TaskQueue queue任务调试者:TimerThread thread
1.任务队列 TaskQueue,它是Timer的一个内部类。
    事实上任务队列是一个数组, 采用平衡二叉堆来实现他的优先级调度, 并且是一个小顶堆. 需要注意的是, 这个堆中queue[n] 的孩子是queue[2*n] 和 queue[2*n+1].
    任务队列的优先级按照TimerTask类的成员变量nextExecutionTime值来排序(注意, 这里的任务指的是那些交由定时器来执行的, 继承TimerTask的对象).
    在任务队列中, nextExecutionTime最小就是所有任务中最早要被调度来执行的, 所以被安排在queue[1] (假设任务队列非空).
    对于堆中任意一个节点n, 和他的任意子孙节点d,一定遵循: n.nextExecutionTime <= d.nextExecutionTime.
// 任务队列
class TaskQueue {
	// 计时器任务数组,默认大小为128
	private TimerTask[] queue = new TimerTask[128];

	private int size = 0;

	int size() {
		return size;
	}

	// 加入队列
	void add(TimerTask task) {
		// Grow backing store if necessary
		if (size + 1 == queue.length)
			// 队列以两倍的速度扩容
			queue = Arrays.copyOf(queue, 2 * queue.length);

		queue[++size] = task;
		fixUp(size);
	}

	// 获取队列的地二个元素,即第一个任务,第一个元素存储的是
	TimerTask getMin() {
		return queue[1];
	}

	TimerTask get(int i) {
		return queue[i];
	}

	// 消除头任务从优先队列。
	void removeMin() {
		queue[1] = queue[size];
		queue[size--] = null; // Drop extra reference to prevent memory leak
		fixDown(1);
	}

	/**
	 * Removes the ith element from queue without regard for maintaining the
	 * heap invariant. Recall that queue is one-based, so 1 <= i <= size.
	 */
	void quickRemove(int i) {
		// 断言,在这里只起测试作用
		assert i <= size;

		queue[i] = queue[size];
		queue[size--] = null; // Drop extra ref to prevent memory leak
	}

	/**
	 * Sets the nextExecutionTime associated with the head task to the specified
	 * value, and adjusts priority queue accordingly.
	 */
	void rescheduleMin(long newTime) {
		queue[1].nextExecutionTime = newTime;
		fixDown(1);
	}

	/**
	 * Returns true if the priority queue contains no elements.
	 */
	boolean isEmpty() {
		return size == 0;
	}

	/**
	 * Removes all elements from the priority queue.
	 */
	void clear() {
		// Null out task references to prevent memory leak
		for (int i = 1; i <= size; i++)
			queue[i] = null;

		size = 0;
	}

	// 进行队列中任务优先级调整. fixUp方法的作用是尽量将队列中指定位置(k)的任务向队列前面移动,
	// 即提高它的优先级. 因为新加入的方法很有可能比已经在任务队列中的其它任务要更早执行.
	private void fixUp(int k) {
		while (k > 1) {
			int j = k >> 1;// 左移一位,相当于除以2
			if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
				break;
			TimerTask tmp = queue[j];
			queue[j] = queue[k];
			queue[k] = tmp;
			k = j;
		}
	}

	// 从任务队列中移除一个任务的过程, 首先直接将当前任务队列中最后一个任务赋给queue[1],
	// 然后将队列中任务数量--, 最后和上面类似, 但是这里是调用fixDown(int k)方法了, 尽量将k位置的任务向队列后面移动.
	private void fixDown(int k) {
		int j;
		while ((j = k << 1) <= size && j > 0) {
			if (j < size
					&& queue[j].nextExecutionTime > queue[j + 1].nextExecutionTime)
				j++; // j indexes smallest kid
			if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
				break;
			TimerTask tmp = queue[j];
			queue[j] = queue[k];
			queue[k] = tmp;
			k = j;
		}
	}

	/**
	 * Establishes the heap invariant (described above) in the entire tree,
	 * assuming nothing about the order of the elements prior to the call.
	 */
	void heapify() {
		for (int i = size / 2; i >= 1; i--)
			fixDown(i);
	}
}
    

2.任务调度 TimerThread
// 计时器线程
class TimerThread extends Thread {

	// 新任务是否被安排
	boolean newTasksMayBeScheduled = true;

	// 任务队列
	private TaskQueue queue;

	TimerThread(TaskQueue queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			mainLoop();
		} finally {
			// Someone killed this Thread, behave as if Timer cancelled
			synchronized (queue) {
				newTasksMayBeScheduled = false;
				queue.clear(); // Eliminate obsolete references
			}
		}
	}

	private void mainLoop() {
		while (true) {
			try {
				TimerTask task;
				boolean taskFired;
				synchronized (queue) {
					// Wait for queue to become non-empty
					while (queue.isEmpty() && newTasksMayBeScheduled)
						queue.wait();
					if (queue.isEmpty())
						break; // Queue is empty and will forever remain; die

					// Queue nonempty; look at first evt and do the right thing
					long currentTime, executionTime;
					task = queue.getMin();
					synchronized (task.lock) {
						if (task.state == TimerTask.CANCELLED) {
							queue.removeMin();
							continue; // No action required, poll queue again
						}
						currentTime = System.currentTimeMillis();
						executionTime = task.nextExecutionTime;
						if (taskFired = (executionTime <= currentTime)) {
							if (task.period == 0) { // Non-repeating, remove
								queue.removeMin();
								task.state = TimerTask.EXECUTED;
							} else { // Repeating task, reschedule
								queue
										.rescheduleMin(task.period < 0 ? currentTime
												- task.period
												: executionTime + task.period);
							}
						}
					}
					if (!taskFired) // Task hasn't yet fired; wait
						queue.wait(executionTime - currentTime);
				}
				if (taskFired) // Task fired; run it, holding no locks
					task.run();
			} catch (InterruptedException e) {
			}
		}
	}
}

3.Timer类的主体和主要对外提供的方法
import java.util.*;
import java.util.Date;

public class Timer {
	// 定时任务队列
	private TaskQueue queue = new TaskQueue();

	// 计时器线程
	private TimerThread thread = new TimerThread(queue);

	private Object threadReaper = new Object() {
		protected void finalize() throws Throwable {
			synchronized (queue) {
				thread.newTasksMayBeScheduled = false;
				queue.notify(); // In case queue is empty.
			}
		}
	};

	// ID号作为线程的ID
	private static int nextSerialNumber = 0;

	private static synchronized int serialNumber() {
		return nextSerialNumber++;
	}

	public Timer() {
		this("Timer-" + serialNumber());
	}

	// 创建一个新计时器,可以指定其相关的线程作为守护程序运行。
	public Timer(boolean isDaemon) {
		this("Timer-" + serialNumber(), isDaemon);
	}

	public Timer(String name) {
		thread.setName(name);
		thread.start();
	}

	// 创建一个新计时器,其相关的线程具有指定的名称,并且可以指定作为守护程序运行。
	public Timer(String name, boolean isDaemon) {
		thread.setName(name);
		thread.setDaemon(isDaemon);
		thread.start();
	}

	// 安排在指定延迟后执行指定的任务。时间单位毫秒
	public void schedule(TimerTask task, long delay) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		sched(task, System.currentTimeMillis() + delay, 0);
	}

	// 安排在指定的时间执行指定的任务。
	public void schedule(TimerTask task, Date time) {
		sched(task, time.getTime(), 0);
	}

	// 安排指定的任务从指定的延迟后开始进行重复的固定延迟执行。
	public void schedule(TimerTask task, long delay, long period) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, System.currentTimeMillis() + delay, -period);
	}

	// 安排指定的任务在指定的时间开始进行重复的固定延迟执行。
	public void schedule(TimerTask task, Date firstTime, long period) {
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, firstTime.getTime(), -period);
	}

	// 安排指定的任务在指定的延迟后开始进行重复的固定速率执行。
	public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
		if (delay < 0)
			throw new IllegalArgumentException("Negative delay.");
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, System.currentTimeMillis() + delay, period);
	}

	// 安排指定的任务在指定的时间开始进行重复的固定速率执行。
	public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
		if (period <= 0)
			throw new IllegalArgumentException("Non-positive period.");
		sched(task, firstTime.getTime(), period);
	}
	private void sched(TimerTask task, long time, long period) {
		if (time < 0)
			throw new IllegalArgumentException("Illegal execution time.");
		// 同步代码块 ,对queue的访问需要同步
		synchronized (queue) {
			if (!thread.newTasksMayBeScheduled)
				throw new IllegalStateException("Timer already cancelled.");
			// 同步代码块,需要获得task的lock,锁
			synchronized (task.lock) {
				if (task.state != TimerTask.VIRGIN)
					throw new IllegalStateException(
							"Task already scheduled or cancelled");
				// 任务接下来执行的时刻
				task.nextExecutionTime = time;
				// 任务执行时间间隔周期
				task.period = period;
				// 任务已经安排,等待执行
				task.state = TimerTask.SCHEDULED;
			}
			// 加入计时器等待任务队列
			queue.add(task);
			//
			if (queue.getMin() == task)
				// 唤醒在此对象监视器上等待的单个线程。
				queue.notify();
		}
	}

	// 终止此计时器,丢弃所有当前已安排的任务。
	public void cancel() {
		synchronized (queue) {
			thread.newTasksMayBeScheduled = false;
			queue.clear();
			queue.notify(); // In case queue was already empty.
		}
	}

	// 从此计时器的任务队列中移除所有已取消的任务。
	public int purge() {
		int result = 0;

		synchronized (queue) {
			for (int i = queue.size(); i > 0; i--) {
				if (queue.get(i).state == TimerTask.CANCELLED) {
					queue.quickRemove(i);
					result++;
				}
			}

			if (result != 0)
				queue.heapify();
		}

		return result;
	}
}

参考:http://japi.iteye.com/blog/1022656
0
3
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics