`

【梳理】多线程任务管理器的实现

    博客分类:
  • J2EE
阅读更多

实现解耦为:任务队列管理和线程管理

文分三部分:任务控制、线程池控制及Demo

类图:



 
类变量:

 

    private int capacity; // 任务队列容量
    private long timeout; // 任务超时时间(ms)

    private HashMap tasks;// task容器
    private ArrayList readyTasks; // 准备就绪的待处理任务

 

1、任务控制部分

createTaskManager:创建任务管理器(静态)

 

	/**
	 * 创建TaskManager
	 * 
	 * @param name
	 *            Task名称
	 * @param capacity
	 *            Task队列容量
	 * @param timeout
	 *            任务超时时间
	 * @param processor
	 *            任务处理器
	 * @param processorNum
	 *            处理线程数
	 * @return TaskManager实例
	 */
    public static TaskManager createTaskManager(String name, int capacity, long timeout, TaskProcessor processor, int processorNum) {
        TaskManager mgt = null;
        if (capacity >=0 && timeout >=0 && processor != null && processorNum > 0) {
        	// 初始化管理器
            mgt = new TaskManager(capacity, timeout);
            mgt.start(name, processor, processorNum);
        }
        return mgt;
    }

 构造Task管理器:

 

注:包含一个Timer 定时延时任务处理器,用于任务超时处理

 

    protected TaskManager(int capacity, long timeout ) {

        this.capacity = capacity;
        this.timeout = timeout;

        tasks = new HashMap();
        readyTasks = new ArrayList();
        
        // 任务定时器(超时任务处理)
        timer = new Timer(true);
    }

 Task管理器启动mgt.start() :初始化线程控制器并启动分配线程处理任务

 

 

    private void start(String name, TaskProcessor processor, int processorNum) {
        ProcessorControllor control = new ProcessorControllor(name, processorNum, processor, this);
        control.start();
    }

 线程控制器的构造:

 

 

    public ProcessorControllor(String name, int processorNum, TaskProcessor processor, TaskManager manager){
        super(name);
        this.taskManager = manager;
        this.processorNum = processorNum;
        this.processor = processor;
        this.setDaemon(true);

        // 线程池 创建指定数目的Thread、调度空闲线程
        pool = new ThreadPool(name, this.processorNum);
        // 负责处理任务
        threadPoolTask = new ThreadPoolTask(this.processor);
    }

 2、线程池控制部分

 

线程池初始化:

 

    public ThreadPool(String name, int maxNum)
    {
		if (maxNum > 0)
        {
			threads = new WorkThread[maxNum];
			for(int i = 0; i < maxNum; i++)
            {
				threads[i] = new WorkThread(name, i);
				threads[i].start();
			}

            statistic_start_time = System.currentTimeMillis();

			TraceManager.TrDebug(null, "ThreadPool:" + name + " " + maxNum + " threads created.");
		}
	}

 工作线程:WorkThread

 

做初始化,线程启动后处于阻塞状态,等待线程池抓取,空闲时交由任务处理

 

	private class WorkThread extends Thread
    {
		// 工作线程在处理的任务task
		private Task task = null;
		private TaskParameter parameter = null;

		public 	WorkThread(String name, int index)
        {
			super("ThreadPool_" + name + "_" + index);
		}
		public void run()
        {
			TraceManager.TrDebug(null, Thread.currentThread().getName() + " started!");

			while(true)
            {

				try
                {
					synchronized(this)
                    {
						// 阻塞  直到线程空闲时将任务加入到task
						while(task == null)
                        {
							this.wait();
						}

					}

					toWork();

					synchronized(this)
                    {
						task = null;
						parameter = null;
					}

					workThreadDone();

				}
                catch(InterruptedException ex)
                {
					TraceManager.TrException(null, ex);
				}
                catch(Exception ex)
                {
					TraceManager.TrException(null, ex);
				}

			}
		}
			
	}

 线程Processor任务器:

 

 

class ThreadPoolTask implements ThreadTask {
    TaskProcessor processor;
    public ThreadPoolTask(TaskProcessor processor) {
        this.processor = processor;
    }
    public void run(TaskParameter parameter) {
        try {
            Task task = (Task)parameter;
            if (task.isTimeout()) {
                processor.timeout(task);
            }else {
                processor.process(task);
            }
        }catch(Exception ex){
            CAP.trException(ex);
        }
    }
}

 

线程池分配线程处理任务:

 

ProcessorControllor.run()

 

    public void run() {
        CAP.trDebug(this.getName() + " started.");
        Task task = null;
        while (true) {
        	// 抓取taskManager中准备就绪的任务
            task = taskManager.processorControllorWait();
            if (task != null) {
                // 获取空闲工作线程,唤醒线程
                pool.getThread(threadPoolTask, task, true);
            }
        }

    }

从Task管理器中移除就绪任务 

 

TaskManager.processorControllorWait():

 

    protected Task processorControllorWait() {
        Task task = null;
        Object key = null;
        synchronized(readyTasks) {
            while (readyTasks.size() == 0) {
                try {
                    readyTasks.wait();
                }catch(Exception ex){
                    CAP.trException(ex);
                }
            }
            key = readyTasks.remove(0);
        }

        task = this.removeTask(key);

        return task;
    }

 将就绪任务交由空闲WorkThread处理:

ThreadPool.getThread()

 

	public int getThread(Task task, TaskParameter parameter, boolean blocked)
    {
        if(task == null)
        {
            return -1;
        }
		if (!blocked)
        {
            return getThread(task, parameter);
        }

		synchronized(this)
        {
			boolean over = false;
			long startTime = System.currentTimeMillis();
			long elapsedTime = 0;
			while (!over)
            {
				for(int i = 0; i < threads.length; i++)
                {
					// 获得空闲线程
					if (threads[i].isIdle(task, parameter))
                    {
						return i;
					}
				}

				try
                {
					// to block the calling thread.
					elapsedTime = System.currentTimeMillis() - startTime;
					if (elapsedTime < MAX_WAIT_TIME)
                    {

						TraceManager.TrDebug(null,
							Thread.currentThread().getName() + " to wait.");

						this.wait(MAX_WAIT_TIME - elapsedTime);
					}
                    else
                    {
						over = true;
						TraceManager.TrDebug(null,
							Thread.currentThread().getName() +
							" waiting too long and will give up waiting.");
					}

					TraceManager.TrDebug(null,
							Thread.currentThread().getName() +
							" waked up!");
				}
                catch(InterruptedException ex)
                {
					over = true;
					TraceManager.TrException(null, ex);
				}
			}
		}

		return -1;
	}

 WorkThread唤醒,由Porcessor处理任务:

 

WorkThread.toWork()

 

 

		private void toWork()
        {
			TraceManager.TrDebug(null,
				Thread.currentThread().getName() + ": to run user task.");
			try
            {
				this.task.run(parameter);
			}
            catch(Throwable ex)
            {
				TraceManager.TrException(null, ex);;
			}
			TraceManager.TrDebug(null,
				Thread.currentThread().getName() + ": user task done.");
		}

 以上完成了Task队列、线程池的构造,现在可以添加任务到Task队列:

TaskManager.addTask

 

    public int addTask(Task task, int priority, int taskStatus) {
        int rst = ERROR_INVALID_PARAMETER;
        if (task == null) {
            return rst;
        }
        Object taskKey = task.getPrimaryKey();
        if (taskKey == null) {
            return rst;
        }

        if (priority != HIGH_PRIORITY && priority != LOW_PRIORITY) {
            return rst;
        }

        synchronized(tasks) {
            if (tasks.get(taskKey) != null) {
                CAP.trError("Queue key is duplicated.");
                return ERROR_KEY_DUPLICATED;
            }

            if (capacity > 0) {
                int size = tasks.size();
                if (size >= capacity) {
                    CAP.trError("Queue capacity exceeded and one task rejected!");
                    return ERROR_CAPACITY_EXCEEDED;
                }
                CAP.trDebug("Total tasks:" + size + "/" + capacity);
            }

            TaskContainer container = new TaskContainer(task);
            tasks.put(taskKey, container);
            timer.schedule(container, timeout);
        }

        if (taskStatus == Task.STATUS_READY) {
            synchronized(readyTasks) {
                if (priority == HIGH_PRIORITY ) {
                    readyTasks.add(0, taskKey);
                }else {
                    readyTasks.add(taskKey);
                }
                readyTasks.notifyAll();
            }
        }

        rst = OK;
        return rst;
    }

 

3、Demo

自定义Task,实现抽象方法getPrimaryKey()

PrimaryKey为task容器的唯一主键

public class MyTask extends Task {

	@Override
	public Object getPrimaryKey() {
		return System.currentTimeMillis();
	}

}

 自定义task 处理类,实现TaskProcessor接口

public class MyTaskProcessor implements TaskProcessor {

	public void process(Task task) {

		System.out.println(task.getPrimaryKey() + " processing...");

	}

	public void timeout(Task task) {
		System.out.println(task.getPrimaryKey() + " timeout...");
	}

 创建任务管理器和添加任务:

public class MyManager {
	public static void main(String[] args) {

		TaskManager mgt = TaskManager.createTaskManager("MyManager", 5, 10 * 1000, new MyTaskProcessor(), 2);
		
		mgt.addTask(new MyTask(), TaskManager.HIGH_PRIORITY, Task.STATUS_READY);
		
	}
}

 

 

 

 

 

 

 

 

  • 大小: 104.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics