`
jianfulove
  • 浏览: 118264 次
  • 性别: Icon_minigender_1
  • 来自: 湛江
社区版块
存档分类
最新评论

多线程处理耗时任务的封装方式

阅读更多

 

工作中用到的处理多任务的多线程实现,以下仅为简略书写以便备忘、

 

package need.most.time.producer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import need.most.time.ProcessorWorkerThread;

public class ProcessingThreadsDispatcher {
	protected int maxQueueSize = new Long(Runtime.getRuntime().maxMemory() / 400000L).intValue();
	int def_pool_size = 100;
	private String defPluginsThreadsPool = "default-threads-pool";
     //这里用一个map来封装,也就是可能有多种处理耗时任务的processor处理器,每一个处理器对应一个处理线程沲也就最合理
	private Map<String, ProcessingThreads<ProcessorWorkerThread>> workerThreads = new ConcurrentHashMap<String, ProcessingThreads<ProcessorWorkerThread>>(32);
	
	ProcessorWorkerThread worker;
	ProcessingThreads<ProcessorWorkerThread> pt;
	{
		worker = new ProcessorWorkerThread();
		pt = new ProcessingThreads<ProcessorWorkerThread>(worker, def_pool_size, maxQueueSize, defPluginsThreadsPool);
		workerThreads.put(defPluginsThreadsPool, pt);
	}
	
	
	//主要的方法入口
	private void walk(final Object arg) {
		//processor 为处理器,专处理耗时任务
		Object processor = new Object();
        String processorId=defPluginsThreadsPool;//应该每一个processor都提供一个唯一的name,这样就可以得到相对应processor
		ProcessingThreads<ProcessorWorkerThread> pt = workerThreads.get(processorId);

		if (pt == null) {
			pt = workerThreads.get(defPluginsThreadsPool);
		}
		if (pt.addItem(processor,arg)) {
			//记录哪个处理器处理的哪个对象等
		}
		
	}
}

 

 

 

package need.most.time;

// 通用的处理器实现
public class ProcessorWorkerThread extends WorkerThread {

	//一般这里就会执行真实代表的处理器实现方法
	@Override
	public void process(QueueItem item) {
		
		Object processor=item.getProcessor();
		Object arg=item.getArg();
		//processor拿着arg对象进行处理耗时任务。
		//processor.process(...)
		System.out.println("do finish");
	}

	@Override
	public WorkerThread getNewInstance() {
		ProcessorWorkerThread worker = new ProcessorWorkerThread();
		return worker;
	}

}

 

 

package need.most.time;

import java.util.concurrent.LinkedBlockingQueue;
//线程处理抽象类
public abstract class WorkerThread extends Thread {

	private LinkedBlockingQueue<QueueItem> queue = null;
	private boolean stopped = false;

	public abstract void process(QueueItem item);
	public abstract WorkerThread getNewInstance();
	public boolean offer(QueueItem item) {
		return queue.offer(item);
	}
	
	public void setQueueMaxSize(int maxSize) {
		LinkedBlockingQueue<QueueItem> oldQueue = queue;

		queue = new LinkedBlockingQueue<QueueItem>(maxSize);

		if (oldQueue != null) {
			queue.addAll(oldQueue);
		}
	}

	@Override
	public void run() {
		QueueItem item = null;

		while (!stopped) {
			try {
				item = queue.take();

				long start = System.currentTimeMillis();

				process(item);

				long end = System.currentTimeMillis() - start;
				System.out.println("do time=" + (end - start) + "ms");
			} catch (Exception e) {
			}

		}

	}
	
	
	

}

 

 

 

 

package need.most.time.producer;

import java.util.ArrayList;

import need.most.time.QueueItem;
import need.most.time.WorkerThread;
//处理线程沲类,些类维护多条处理线程
public class ProcessingThreads<E extends WorkerThread> {
	private int numWorkerThreads = 1;
	private ArrayList<E> workerThreads = null;
	private String name = null;

	public ProcessingThreads(E worker, int numWorkerThreads, int maxQueueSize, String name) {
		this.numWorkerThreads = numWorkerThreads;
		this.workerThreads = new ArrayList<E>(numWorkerThreads);
		this.name = name;
		// 生成指定数量的工作线程
		for (int j = 0; j < numWorkerThreads; j++) {
			WorkerThread t = worker.getNewInstance();

			t.setQueueMaxSize(maxQueueSize);
			t.setDaemon(true);
			t.setName(name + " Queue Worker " + j);
			t.start();
			workerThreads.add((E) t);
		}
	}

	public boolean addItem(Object processor, Object arg) {
		boolean ret = false;
		QueueItem item = new QueueItem(processor, arg);
		//这里应该指定哪一个用户他的唯一标识,这样可以每次处理同一个用户都是由同一个线程来执行
		ret = workerThreads.get(Math.abs(processor.hashCode()) % numWorkerThreads).offer(item);
		return ret;
	}

}

 

 

 

 

 

 

package need.most.time;
//封装一些处理参数bean
public class QueueItem {
	private Object arg;
	private Object processor;
	public QueueItem(Object arg, Object processor) {
		super();
		this.arg = arg;
		this.processor = processor;
	}
	public Object getArg() {
		return arg;
	}
	public void setArg(Object arg) {
		this.arg = arg;
	}
	public Object getProcessor() {
		return processor;
	}
	public void setProcessor(Object processor) {
		this.processor = processor;
	}
	

}

 

 

分享到:
评论

相关推荐

    基于Qt的多线程流水线异步服务器稳定版

    数据库被作为资源管理,支持在多线程的条件下,使用数据库资源。 5、 框架界面。尽管常见的服务运行时表现为一个后台进程,但为了更好的演示服务器的功能,避免繁琐的配置,还是需要一个图形界面来显示状态、设置...

    libcurl长连接高并发高性能封装测试分析源代码

    项目中需要用到Curl频繁...单个线程下载速度毕竟有限,使用多线程去调用接口。实现高并发高性能,需要考虑资源分配和冲突的问题。 (3) 异步调用。和socket异步调用的原理类似。同步调用会阻塞等待,造成CPU占用率高。

    Android带进度条的下载图片示例(AsyncTask异步任务)

    android本身是一个多线程的操作系统,我们不能把所有的操作都放在主线程中操作 ,比如一些耗时操作。如果放在主线程中 会造成阻塞 而当阻塞事件过长时 系统会抛出anr异常。所以我们要使用异步任务。android为我们...

    Java 面经手册.zip

    多线程支持: Java内置了对多线程的支持,允许程序同时执行多个任务。这对于开发需要高并发性能的应用程序(如服务器端应用、网络应用等)非常重要。 自动内存管理(垃圾回收): Java具有自动内存管理机制,通过...

    Java网络爬虫EggJava.zip

    Egg是一个通用,多线程的Java爬虫框架。 Egg简单小巧,api非常简单,容易上手。 Egg性能不错,并实现多种请求方式。 能够比较快的响应使用者的需求 速度说明 实测数据,在20M无线网下(隔了堵...

    Qthread_moveTothread.rar

    参照Gt5官方说明文档,在控制台中实现了一个交互终端,编写了线程类,封装了QThread,将耗时函数类Worker采用moveToThread方法放到QThread线程中。并通过命令字控制线程的开启和关闭。

    vue-typescript-template:Vue + TypeScript + Elment-ui + Axios + WebSocket + animate.css + mockjs 的 Vue 项目模版

    Web Workers 的引入(开启一个线程、分担主线程的计算压力、在处理特别耗时的任务中特别有用) WebSocket 的嵌入(双向通讯) 多页面配置 Element-ui(表格、搜索、分页组件的封装、主题、国际化等) git commit 提交...

    Android 消息机制详解及实例代码

    Android 消息机制 1.概述 Android应用启动时,会默认有一个主线程(UI线程),在这个线程中会关联一个消息队列(MessageQueue),所有的操作都会被封装成消息队列然后交给...在子线程中完成耗时操作,很多情况下需要更

    Android加载对话框同时异步执行实现方法

    在实现上比较的烦琐,为简化此方法,花了点时间封装了Thread和Handler,现在通过简单的代码就可以实现相同的功能,而把更多精力放到业务逻辑处理上! 效果如图: 代码如下: LoadingDialog loadingDialog = new ...

    eaasyexcel批量导入+校验

    1.数据模型层面进行了封装,使用简单 2.重写了07版本的Excel解析代码,减低内存消耗,可以有效避免OOM 3.只能操作Excel 4.不能读取图片 5.读写Excel,数据在excel文件,程序&lt;实体类,MAP&gt;两个载体之间 互相流转

    精易模块[源码] V5.15

    1、新增“线程_枚举”枚举指定进程ID中所有线程列表,成功返回线程数量,失败返回零。 2、删除“文件_取图标”与"文件_取图标句柄"功能重复。 3、优化“系统_创建桌面快捷方式”流程代码,感谢易友[ds9660]反馈。 4...

    精易官方免费模块v3.60版

    因为该命令调用很多的子程序,涉及代码太多,维护时间耗时2天多,也因此拖慢了3.51的发布 故决定,先发布3.51版本,本命令可能隐藏的BUG,我们继续研究,看能否找出真正的问题所在 精易模块 V3.50 what’s new:...

    C#全能速查宝典

    2.1.18 MDI窗体——多文档界面 143 2.1.19 MdiChildren属性——获取子窗体的数组 146 2.1.20 MdiParent属性——设置父窗体 147 2.1.21 MinimizeBox属性——是否显示最小化按钮 147 2.1.22 Minimum属性——数字显示框...

    java基础入门教程

    工 业 界 不 少 人 预 言 :"Java 语 言 的 出 现 ,将 会 引 起 一 场软 件革 命 ",这 是 因 为 传统 的 软 件 往 往 都 是 与 具 体 的 实现 环 境 有 关 ,换 了 一 个 环 境 就需 要 作 一 番 改 动 ,耗时 费 力 ,...

Global site tag (gtag.js) - Google Analytics