package com.thread.test.masterworker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
//存放全部工作任务队列
protected ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue();
//存放全部的工作对象
protected Map<String, Thread> workers = new HashMap<String, Thread>();
//接收每个工作者工作后的结果
protected ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap();
//构造方法
public Master(Worker worker, int workCounts) {
//处理结果的引用 用于任务的提交
worker.setResultMap(this.resultMap);
//任务队列 需要有任务的引用 用于任务的领取
worker.setWorkerQueue(this.workerQueue);
for (int i = 0; i < workCounts; i++) {
workers.put("任务" + i, new Thread(worker));
}
}
//判断所有的子任务是否结束
protected boolean isComplete() {
for (Map.Entry<String, Thread> set : workers.entrySet()) {
if (set.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
//总共有多少个未完成工作
protected int isNotCompletedCount() {
int count = 0;
for (Map.Entry<String, Thread> set : workers.entrySet()) {
if (set.getValue().getState() != Thread.State.TERMINATED) {
count++;
}
}
return count;
}
//提交任务
public void submit(Task task) {
workerQueue.add(task);
}
//启动任务
public void execute() {
for (Map.Entry<String, Thread> set : workers.entrySet()) {
set.getValue().start();
}
}
public Integer getResult() {
Integer result = 0;
for (Map.Entry result_ : resultMap.entrySet()) {
result += Integer.valueOf(result_.getValue().toString());
}
return result;
}
}
package com.thread.test.masterworker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Worker implements Runnable {
//1.首先要实现runnble接口
//Master 的引用
//存放执行结果
private Map<String, Object> resultMap = new HashMap<String, Object>();
//工作着队列
ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>();
public void run() {
while (true) {
Task task = workerQueue.poll();
if (task == null) {
break;
}
Object object = hanlder(task);
resultMap.put(String.valueOf(task.getId()), task.getPrice());
}
}
public Object hanlder(Task task) {
return null;
}
public Map<String, Object> getResultMap() {
return resultMap;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) {
this.workerQueue = workerQueue;
}
}
package com.thread.test.masterworker;
public class Task {
private int id;
private String name;
private long price;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getPrice() {
return price;
}
public void setPrice(long price) {
this.price = price;
}
}
package com.thread.test.masterworker;
public class MyWorker extends Worker {
@Override
public Object hanlder(Task task) {
return 0;
}
}
package com.thread.test.masterworker;
import java.util.Random;
public class Main {
public static void main(String[] args) {
Master master = new Master(new MyWorker(), 10);
Random random = new Random();
for (int i = 0; i < 100; i++) {
Task task = new Task();
task.setId(i);
task.setName("task_" + i);
task.setPrice(random.nextInt());
master.submit(task);
}
master.execute();
while (true) {
if (master.isComplete()) {
System.out.println(master.getResult());
break;
} else
System.out.println("还有" + master.isNotCompletedCount() + "个任务未完成");
}
}
}
分享到:
相关推荐
(注意,本资源附带书中源代码可供参考) 多线程与并发处理是程序设计好坏优劣的重要课题,本书通过浅显易懂的文字与实例来介绍Java线程相关的设计模式概念,并且通过实际的Java程序范例和 UML图示来一一解说,书中...
结合 Etcd 与 MongoDB 实现一个基于 Master-Worker 分布式 架构的 crontab 系统.zip
Master-Worker是一个很好用的设计模式, 它可以把一个大工作分成很多小工作去执行, 等所有小工作都回来以后, 将所有的小工作结果做一个统整, 得到最终结果, 就很像专题小组有五个人, 把系统分成四等分, 每个人负责一...
目录: 漫谈UML Introduction 1 Java语言的线程 Introduction 2 多线程...总结 多线程程序设计的模式语言 附录A 练习问题的解答 附录B Java的内存模型 附录C Java线程的优先级 附录D 线程相关的主要API 附录E 参考文献
Master-Worker模式的并行关联规则挖掘算法,苗锡奎,,随着信息技术迅速发展,数据库的规模不断扩大,从而产生了大量的数据。如果使用传统的数据挖掘技术从这庞大的数据中挖掘出有价值
java多线程设计模式 线程的创建和重起 线程的同步 wait/notify/sleep机制 Worker Pattern
用C#编写的master-worker并行任务系统 手动部署系统 您只需要源文件 克隆或下载此存储库 现在,您需要编译Master并启动它: 编译它 制作自己的config.json文件: { " wsport " : 6969 , " mainport " : 9696 , ...
cpp_master_worker依赖第三方库地址 loghelper : 这是我一直在使用的基于boost.log的日志库; concurrentqueue : A fast multi-producer, multi-consumer lock-free concurrent queue for C++11 libzmq : ZeroMQ ...
介绍使用c++同步zookeeper客户端接口的Zookeeper master-worker实现Zookeeper C++ 客户端c++ 客户端实现基于zookeeper c 绑定客户端和boost。 与c-binding客户端相比,增加了一些便利:(1)所有客户端请求同步发送...
主要介绍了php实现 master-worker 守护多进程模式的实例代码,代码简单易懂,非常不错,具有一定的参考借鉴价值 ,需要的朋友可以参考下
│ 高并发编程第二阶段36讲、多线程Worker-Thread设计模式-上.mp4 │ 高并发编程第二阶段37讲、多线程Worker-Thread设计模式-上.mp4 │ 高并发编程第二阶段38讲、多线程Active Objects设计模式(接受异步消息的...
36种最新设计模式整理 Design Pattern: Simple Factory 模式 Design Pattern: Abstract Factory 模式 Design Pattern: Builder 模式 Design Pattern: Factory Method 模式 Design Pattern: Prototype 模式 ...
Apache的prefork模式和worker模式该户.docx
│ 高并发编程第二阶段36讲、多线程Worker-Thread设计模式-上.mp4 │ 高并发编程第二阶段37讲、多线程Worker-Thread设计模式-上.mp4 │ 高并发编程第二阶段38讲、多线程Active Objects设计模式(接受异步消息的...
prefork的无线程设计在某些情况下将比worker更有优势:它可以使用那些没有处理好线程安全的第三方模块,并 且对于那些线程调试困难的平台而言,它也更容易调试一些。 worker模式使用多个子进程,每个子进程有多个...
命令php think worker:gateway在windows下运行...根据GatewayWorker-for-win提供的demo修改的 本资源依赖GatewayWorker扩展,请先安装扩展。 使用方法,把解压后的文件夹放到项目根目录,双击start_for_win.bat,启动
master进程为主进程,启动过程中读取conf配置,根据每个应用配置中的ip和端口创建监听socket,然后再根据配置中的进程数创建对应数量的子进程即worker进程,worker进程会自动继承master进程创建的监听socket,使得...
apache2的worker工作模式配置及MaxClients不足问题解决
在centos7系统下自动部署k8s,默认是1主2worker;可通过修改相关变量增加worker节点; 需要对照自己网络环境进行相关修改内部参数;...网络或设备性能不好时建议先在worker执行此脚本、再在master节点运行