论坛首页 Java企业应用论坛

Java版工人-监工模式

浏览 3900 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2014-03-12  

    之前在学习erlang OTP的时候,看到在OTP中实现了工人-监工模式,就是在定义一个工作者进程的时候,同时为器分配了一个overseer(监工),监工啥事儿也不作,就专门负责工作者进程是否正常工作,有无任务异常情况发生,当时看到在机制觉得不以为然觉得就这么几行代码没什么大不了的。

 

    当我最近用java代码来实现分布式编程的时候发现要做一个稳定的,可靠的系统并不是那么容易,之前在单机系统中默认不会不会出任何问题的共享变量操作,进程间通信。在分布式系统中zookeeper锁操作异常,远程socke长连接断连,一切都变得不是那么可靠了,甚至要默许这种异常成为常态。

 

     但是系统还要保证是4个9的稳定性,按照以前的策略是,发生异常的地方,进行函数重试等,于是乎系统的代码开始变得冗余,杂乱起来。

 

     erlang OTP的策略是,默认异常是一种常态,出问题了没有关系,只要将整个进程重启,重启的逻辑由监工进程负责,这样对于工作进程内部执行的逻辑还是高内聚的,清晰的。初始化的时候还可以设置监工进程的重启规则,比如一分钟之内如果超过5次异常,就认为系统中的崩溃啦,就会执行终止业务进程的逻辑。

 

   按照这个逻辑,我在java中也简单实现了监工-工人模式,代码如下:

 

 

import java.util.concurrent.ExecutorService;
import java.uil.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 工人和监工模式实现
 */
public class TaskOverseer {
	private final OverSeer oversee;

	private final long timeInterval;

	private static final ExecutorService threadPool = Executors
			.newCachedThreadPool();

	private int maxErrorCount;

	public TaskOverseer(long duration, TimeUnit timeunit, int maxErrorCount) {
		super();
		this.maxErrorCount = maxErrorCount;
		this.timeInterval = timeunit.toMillis(duration);
		this.oversee = new OverSeer();
		this.oversee.setDaemon(true);
		this.oversee.start();

	}

	private final AtomicInteger count = new AtomicInteger();
	private final AtomicLong timestampe = new AtomicLong(System
			.currentTimeMillis());

	protected void startWork() throws Exception {

	}

	protected void recycleResource() {

	}

	private class OverSeer extends Thread {

		public OverSeer() {
			super("Overseer thhread");
		}

		@Override
		public void run() {
			try {
				while (true) {
					synchronized (this) {
						System.out.println("launch a worker task");
						threadPool.execute(new Worker());
						this.wait();
					}
				}

			} catch (InterruptedException e) {

				throw new RuntimeException(e);
			}
		}
	}

	private class Worker implements Runnable {
		@Override
		public void run() {

			try {
				startWork();
			} catch (Throwable e) {
				synchronized (oversee) {
					recycleResource();

					final long currentTimestamp = System.currentTimeMillis();
					if ((currentTimestamp <= (timestampe.get() + timeInterval))
							&& count.incrementAndGet() > maxErrorCount) {
						// 终止监工进程
						oversee.interrupt();
						throw new RuntimeException("between 1 min "
								+ count.get() + " errors occur", e);
					} else {

						if (currentTimestamp > (timestampe.get() + timeInterval)) {
							count.set(0);
							timestampe.set(currentTimestamp);
						}
					}

					oversee.notifyAll();
				}

			}
		}
	}

	
}

 

    只需继承TaskOverseer类,在构造函数上设置,在多少时间内抛最多多少个异常,如果超过了这个异常数目,系统就会终止。然后按照业务需要覆写startWork这个函数,在里面添加一些业务代码。

​   例如:

public static void main(String[] args) throws Exception {

		TaskOverseer overseer = new TaskOverseer(1l, TimeUnit.MINUTES, 4) {
			@Override
			protected void startWork() throws Exception {
				Thread.sleep(1000 * 20);
				throw new Exception("i am die");
			}
		};

		Thread.sleep(10000 * 99);
	}

 

按照如上构造函数中定义的,一分钟之内最多发生四次异常,超过这个数目系统就会奔溃。

startWork方法中会20秒抛出一个异常,一分钟之内最多抛三个异常,所以系统将这样一直运行下去,如果将线程sleep的时间小于15秒的话,系统一段时间之后就会自动奔溃。

​如果用这样的策略来处理分布式环境下的异常,相信可以构建出一个更加稳定健壮的系统。

论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics