实现解耦为:任务队列管理和线程管理
文分三部分:任务控制、线程池控制及Demo
类图:
类变量:
private int capacity; // 任务队列容量 private long timeout; // 任务超时时间(ms) private HashMap tasks;// task容器 private ArrayList readyTasks; // 准备就绪的待处理任务
1、任务控制部分
createTaskManager:创建任务管理器(静态)
/** * 创建TaskManager * * @param name * Task名称 * @param capacity * Task队列容量 * @param timeout * 任务超时时间 * @param processor * 任务处理器 * @param processorNum * 处理线程数 * @return TaskManager实例 */ public static TaskManager createTaskManager(String name, int capacity, long timeout, TaskProcessor processor, int processorNum) { TaskManager mgt = null; if (capacity >=0 && timeout >=0 && processor != null && processorNum > 0) { // 初始化管理器 mgt = new TaskManager(capacity, timeout); mgt.start(name, processor, processorNum); } return mgt; }
构造Task管理器:
注:包含一个Timer 定时延时任务处理器,用于任务超时处理
protected TaskManager(int capacity, long timeout ) { this.capacity = capacity; this.timeout = timeout; tasks = new HashMap(); readyTasks = new ArrayList(); // 任务定时器(超时任务处理) timer = new Timer(true); }
Task管理器启动mgt.start() :初始化线程控制器并启动分配线程处理任务
private void start(String name, TaskProcessor processor, int processorNum) { ProcessorControllor control = new ProcessorControllor(name, processorNum, processor, this); control.start(); }
线程控制器的构造:
public ProcessorControllor(String name, int processorNum, TaskProcessor processor, TaskManager manager){ super(name); this.taskManager = manager; this.processorNum = processorNum; this.processor = processor; this.setDaemon(true); // 线程池 创建指定数目的Thread、调度空闲线程 pool = new ThreadPool(name, this.processorNum); // 负责处理任务 threadPoolTask = new ThreadPoolTask(this.processor); }
2、线程池控制部分
线程池初始化:
public ThreadPool(String name, int maxNum) { if (maxNum > 0) { threads = new WorkThread[maxNum]; for(int i = 0; i < maxNum; i++) { threads[i] = new WorkThread(name, i); threads[i].start(); } statistic_start_time = System.currentTimeMillis(); TraceManager.TrDebug(null, "ThreadPool:" + name + " " + maxNum + " threads created."); } }
工作线程:WorkThread
做初始化,线程启动后处于阻塞状态,等待线程池抓取,空闲时交由任务处理
private class WorkThread extends Thread { // 工作线程在处理的任务task private Task task = null; private TaskParameter parameter = null; public WorkThread(String name, int index) { super("ThreadPool_" + name + "_" + index); } public void run() { TraceManager.TrDebug(null, Thread.currentThread().getName() + " started!"); while(true) { try { synchronized(this) { // 阻塞 直到线程空闲时将任务加入到task while(task == null) { this.wait(); } } toWork(); synchronized(this) { task = null; parameter = null; } workThreadDone(); } catch(InterruptedException ex) { TraceManager.TrException(null, ex); } catch(Exception ex) { TraceManager.TrException(null, ex); } } } }
线程Processor任务器:
class ThreadPoolTask implements ThreadTask { TaskProcessor processor; public ThreadPoolTask(TaskProcessor processor) { this.processor = processor; } public void run(TaskParameter parameter) { try { Task task = (Task)parameter; if (task.isTimeout()) { processor.timeout(task); }else { processor.process(task); } }catch(Exception ex){ CAP.trException(ex); } } }
线程池分配线程处理任务:
ProcessorControllor.run()
public void run() { CAP.trDebug(this.getName() + " started."); Task task = null; while (true) { // 抓取taskManager中准备就绪的任务 task = taskManager.processorControllorWait(); if (task != null) { // 获取空闲工作线程,唤醒线程 pool.getThread(threadPoolTask, task, true); } } }
从Task管理器中移除就绪任务
TaskManager.processorControllorWait():
protected Task processorControllorWait() { Task task = null; Object key = null; synchronized(readyTasks) { while (readyTasks.size() == 0) { try { readyTasks.wait(); }catch(Exception ex){ CAP.trException(ex); } } key = readyTasks.remove(0); } task = this.removeTask(key); return task; }
将就绪任务交由空闲WorkThread处理:
ThreadPool.getThread()
public int getThread(Task task, TaskParameter parameter, boolean blocked) { if(task == null) { return -1; } if (!blocked) { return getThread(task, parameter); } synchronized(this) { boolean over = false; long startTime = System.currentTimeMillis(); long elapsedTime = 0; while (!over) { for(int i = 0; i < threads.length; i++) { // 获得空闲线程 if (threads[i].isIdle(task, parameter)) { return i; } } try { // to block the calling thread. elapsedTime = System.currentTimeMillis() - startTime; if (elapsedTime < MAX_WAIT_TIME) { TraceManager.TrDebug(null, Thread.currentThread().getName() + " to wait."); this.wait(MAX_WAIT_TIME - elapsedTime); } else { over = true; TraceManager.TrDebug(null, Thread.currentThread().getName() + " waiting too long and will give up waiting."); } TraceManager.TrDebug(null, Thread.currentThread().getName() + " waked up!"); } catch(InterruptedException ex) { over = true; TraceManager.TrException(null, ex); } } } return -1; }
WorkThread唤醒,由Porcessor处理任务:
WorkThread.toWork()
private void toWork() { TraceManager.TrDebug(null, Thread.currentThread().getName() + ": to run user task."); try { this.task.run(parameter); } catch(Throwable ex) { TraceManager.TrException(null, ex);; } TraceManager.TrDebug(null, Thread.currentThread().getName() + ": user task done."); }
以上完成了Task队列、线程池的构造,现在可以添加任务到Task队列:
TaskManager.addTask
public int addTask(Task task, int priority, int taskStatus) { int rst = ERROR_INVALID_PARAMETER; if (task == null) { return rst; } Object taskKey = task.getPrimaryKey(); if (taskKey == null) { return rst; } if (priority != HIGH_PRIORITY && priority != LOW_PRIORITY) { return rst; } synchronized(tasks) { if (tasks.get(taskKey) != null) { CAP.trError("Queue key is duplicated."); return ERROR_KEY_DUPLICATED; } if (capacity > 0) { int size = tasks.size(); if (size >= capacity) { CAP.trError("Queue capacity exceeded and one task rejected!"); return ERROR_CAPACITY_EXCEEDED; } CAP.trDebug("Total tasks:" + size + "/" + capacity); } TaskContainer container = new TaskContainer(task); tasks.put(taskKey, container); timer.schedule(container, timeout); } if (taskStatus == Task.STATUS_READY) { synchronized(readyTasks) { if (priority == HIGH_PRIORITY ) { readyTasks.add(0, taskKey); }else { readyTasks.add(taskKey); } readyTasks.notifyAll(); } } rst = OK; return rst; }
3、Demo
自定义Task,实现抽象方法getPrimaryKey()
PrimaryKey为task容器的唯一主键
public class MyTask extends Task { @Override public Object getPrimaryKey() { return System.currentTimeMillis(); } }
自定义task 处理类,实现TaskProcessor接口
public class MyTaskProcessor implements TaskProcessor { public void process(Task task) { System.out.println(task.getPrimaryKey() + " processing..."); } public void timeout(Task task) { System.out.println(task.getPrimaryKey() + " timeout..."); }
创建任务管理器和添加任务:
public class MyManager { public static void main(String[] args) { TaskManager mgt = TaskManager.createTaskManager("MyManager", 5, 10 * 1000, new MyTaskProcessor(), 2); mgt.addTask(new MyTask(), TaskManager.HIGH_PRIORITY, Task.STATUS_READY); } }
相关推荐
一个简易的任务管理器。实现了进程的管理,应用程序的管理,添加了鼠标右键菜单。使用了多线程以及对系统api的使用。另外还利用GDI+实现了性能管理视图。另外还免费赠送一个皮肤dll(DotNetSkin)经过多次测试,完全...
C#多线程实现进程管理(同步执行)
C#多线程实现了简单的任务管理器的功能; 是初学多线程同学的一个很好的例子,有助于进一步了解多线程;
12.如何实现多线程多任务?(Visual C++编程 源代码)12.如何实现多线程多任务?(Visual C++编程 源代码)12.如何实现多线程多任务?(Visual C++编程 源代码)12.如何实现多线程多任务?(Visual C++编程 源代码)...
1. SpringBoot 自定义线程池以及多线程间的异步调用(@Async、@EnableAsync) 2.Java多线程之定时任务 以及 SpringBoot多线程实现定时任务 3.@EnableScheduling 与 @Scheduled
多线程执行任务具体实现方式;
简单winform 多线程 多任务管理 demo 可根据业务进一步扩展开发,如:下载器等
C#多线程多任务管理模型
易语言源码多线程多任务下载软件.rar
很容易上手的一个.Net多线程的定时任务功能,Demo很容易上手,完成了以天、周、月等的计划任务
多线程多任务多线程多任务,简单易用,用MFC写的多线程的小程序。
多任务多线程下载器源码,e语言编写。
C#多线程并行管理,通过Task实现,可对单个任务进行暂停,继续以及停止等操作,每个任务均有单独的进度条显示 同时执行的任务个数可以自行设置
作用:模仿线程池操作,管理多线程任务,超时,以及完成任务的回调。如果有bug自行处理,服务器挂机一天跑了三千万个线程投递没有出现什么异常。资源作者:。流云思水。资源界面:。资源下载:。
定时器与多线程任务调度器使用定时器与多线程任务调度器使用
第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程第二章多任务和多线程...
主要实现了,多个线程任务在同时执行的情况下,保证线程任务顺序的问题。更通俗来说,就是保证Thread1一定在thread2,thread3之后才能执行。另外,代码里我写了详细的注释,和测试的效果,绝对让你能看懂。还有我传的...
输入进程的PID,在任务管理器中可以添加PID列即可查看到,然后输入要监控的间隔。启动后即可监控,停止后会在程序目录生成一个日志。
java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号 java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号