`
coolxing
  • 浏览: 870141 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
9a45b66b-c585-3a35-8680-2e466b75e3f8
Java Concurre...
浏览量:95963
社区版块
存档分类
最新评论

停止基于线程的Service--JCIP7.2读书笔记

阅读更多

[本文是我对Java Concurrency In Practice 7.2的归纳和总结.  转载请注明作者和出处,  如有谬误, 欢迎在评论中指正. ]

以ExecutorService为例, 该类内部封装有多个线程, 类外部无法直接停止这些线程. 相反, 外部调用Service的shutDown和shutDownNow方法关闭Service, 而Service负责停止其拥有的线程.

大多数server应用会使用到log, 下例中的LogWriter是一个使用生产者消费者模式构建的log service, 需要打印log的线程将待打印的内容加入到阻塞队列中, 而logger线程则不断的从阻塞队列中取出数据输出:

public class LogWriter {
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;

	public LogWriter(Writer writer) {
		this.queue = new LinkedBlockingQueue<String>(CAPACITY);
		this.logger = new LoggerThread(writer);
	}

	public void start() {
		logger.start();
	}

	/**
	 * 需要打印数据的线程调用该方法, 将待打印数据加入阻塞队列
	 */
	public void log(String msg) throws InterruptedException {
		queue.put(msg);
	}

	/**
	 * 负责从阻塞队列中取出数据输出的线程
	 */
	private class LoggerThread extends Thread {
		private final PrintWriter writer;
		// ...
		public void run() {
			try {
				while (true)
					writer.println(queue.take());
			} catch (InterruptedException ignored) {
			} finally {
				writer.close();
			}
		}
	}
}

LogWriter内部封装有LoggerThread线程, 所以LogWriter是一个基于线程构建的Service. 根据ExecutorService的经验, 我们需要在LogWriter中提供停止LoggerThread线程的方法. 看起来这并不是很难, 我们只需在LogWriter中添加shutDown方法:

/**
 * 该方法用于停止LoggerThread线程
 */
public void shutDown() {
	logger.interrupt();
}

当LogWriter.shutDown方法被调用时, LoggerThread线程的中断标记被设置为true, 之后LoggerThread线程执行queue.take()方法时会抛出InterruptedException异常, 从而使得LoggerThread线程结束.

然而这样的shutDown方法并不是很恰当: 

1. 丢弃了队列中尚未来得及输出的数据.

2. 更严重的是, 假如线程A对LogWriter.log方法的调用因为队列已满而阻塞, 此时停止LoggerThread线程将导致线程A永远阻塞在queue.put方法上.

对上例的改进:

public class LogWriter {
	private final BlockingQueue<String> queue;
	private final LoggerThread loggerThread;
	private final PrintWriter writer;

	/**
	 * 表示是否关闭Service
	 */
	private boolean isShutdown;
	/**
	 * 队列中待处理数据的数量
	 */
	private int reservations;

	public void start() {
		loggerThread.start();
	}

	public void shutDown() {
		synchronized (this) {
			isShutdown = true;
		}
		loggerThread.interrupt();
	}

	public void log(String msg) throws InterruptedException {
		synchronized (this) {
			// service已关闭后调用log方法直接抛出异常
			if (isShutdown)
				throw new IllegalStateException("Service has been shut down");
			++reservations;
		}
		// BlockingQueue本身就是线程安全的, put方法的调用不在同步代码块中
		// 我们只需要保证isShutdown和reservations是线程安全的即可
		queue.put(msg);
	}

	private class LoggerThread extends Thread {
		public void run() {
			try {
				while (true) {
					try {
						synchronized (this) {
							// 当service已关闭且处理完队列中的所有数据时才跳出while循环
							if (isShutdown && reservations == 0)
								break;
						}
						String msg = queue.take();
						synchronized (this) {
							--reservations;
						}
						writer.println(msg);
					} catch (InterruptedException e) {
						// 发生InterruptedException异常时不应该立刻跳出while循环
						// 而应该继续输出log, 直到处理完队列中的所有数据
					}
				}
			} finally {
				writer.close();
			}
		}
	}
}

上面的处理显得过于复杂, 利用ExecutorService可以编写出相对更简洁的程序:

public class LogService {
	/**
	 * 创建只包含单个线程的线程池, 提交给该线程池的任务将以串行的方式逐个运行
	 * 本例中, 此线程用于执行打印log的任务
	 */
	private final ExecutorService exec = Executors.newSingleThreadExecutor();
	private final PrintWriter writer;

	public void start() {
	}

	public void shutdown() throws InterruptedException {
		try {
			// 关闭ExecutorService后再调用其awaitTermination将导致当前线程阻塞, 直到所有已提交的任务执行完毕, 或者发生超时
			exec.shutdown();
			exec.awaitTermination(TIMEOUT, UNIT);
		} finally {
			writer.close();
		}
	}

	public void log(String msg) {
		try {
			// 线程池关闭后再调用其execute方法将抛出RejectedExecutionException异常
			exec.execute(new WriteTask(msg));
		} catch (RejectedExecutionException ignored) {
		}
	}
	
	private final class WriteTask implements Runnable {
		private String msg;
		
		public WriteTask(String msg) {
			this.msg = msg;
		}

		@Override
		public void run() {
			writer.println(msg);
		}
	}
}
2
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics