`

使用Java多线程实现任务分发

    博客分类:
  • Java
阅读更多

使用Java多线程实现任务分发

http://developer.51cto.com  2009-08-13 09:07  佚名  网络转载
  • 摘要:本文向你介绍使用Java多线程技术实现任务列表指派到线程的思路和执行过程,作者通过任务分发器、任务执行和工作线程实现了这个功能。
  • 标签:Java多线程

      多线程下载由来已久,如 FlashGet、NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干区块,把区块分段分发给每个线程去下载,线程从本段起始处下载数据及至段尾,多个线程下载的内容最终会写入到同一个文件中。

只研究有用的,工作中的需求:要把多个任务分派给Java的多个线程去执行,这其中就会有一个任务列表指派到线程的策略思考:已知:1. 一个待执行的任务列表,2. 指定要启动的线程数;问题是:每个线程实际要执行哪些任务。

使用Java多线程实现这种任务分发的策略是:任务列表连续按线程数分段,先保证每线程平均能分配到的任务数,余下的任务从前至后依次附加到线程中--只是数量上,实际每个线程执行的任务都还是连续的。如果出现那种僧多(线程) 粥(任务) 少的情况,实际启动的线程数就等于任务数,一挑一。这里只实现了每个线程各扫自家门前雪,动作快的完成后眼见别的线程再累都是爱莫能助。

实现及演示代码如下:由三个类实现,写在了一个 Java 文件中:TaskDistributor 为任务分发器,Task 为待执行的任务,WorkThread 为自定的工作线程。代码中运用了命令模式,如若能配以监听器,用上观察者模式来控制 UI 显示就更绝妙不过了,就能实现像下载中的区块着色跳跃的动感了,在此定义下一步的着眼点了。

代码中有较为详细的注释,看这些注释和执行结果就很容易理解的。main() 是测试方法

  1. package com.unmi.common;   
  2. import java.util.ArrayList;   
  3. import java.util.List;   
  4. /**   
  5. * 指派任务列表给线程的分发器   
  6. * @author Unmi   
  7. * QQ: 1125535 Email: fantasia@sina.com   
  8. * MSN: kypfos@msn.com 2008-03-25   
  9. */   
  10. public class TaskDistributor {   
  11. /**   
  12. * 测试方法   
  13. * @param args   
  14. */   
  15. public static void main(String[] args) {   
  16. //初始化要执行的任务列表   
  17. List taskList = new ArrayList();   
  18. for (int i = 0; i < 108; i++) {   
  19. taskList.add(new Task(i));   
  20. }   
  21. //设定要启动的工作线程数为 5 个   
  22. int threadCount = 5;   
  23. List[] taskListPerThread = distributeTasks(taskList, threadCount);   
  24. System.out.println("实际要启动的工作线程数:"+taskListPerThread.length);   
  25. for (int i = 0; i < taskListPerThread.length; i++) {   
  26. Thread workThread = new WorkThread(taskListPerThread[i],i);   
  27. workThread.start();   
  28. }   
  29. }   
  30. /**   
  31. * 把 List 中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程   
  32. * 返回的数组有多少个元素 (List) 就表明将启动多少个工作线程   
  33. * @param taskList 待分派的任务列表   
  34. * @param threadCount 线程数   
  35. * @return 列表的数组,每个元素中存有该线程要执行的任务列表   
  36. */   
  37. public static List[] distributeTasks(List taskList, int threadCount) {   
  38. // 每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务   
  39. int minTaskCount = taskList.size() / threadCount;   
  40. // 平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中   
  41. int remainTaskCount = taskList.size() % threadCount;   
  42. // 实际要启动的线程数,如果工作线程比任务还多   
  43. // 自然只需要启动与任务相同个数的工作线程,一对一的执行   
  44. // 毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程   
  45. int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;   
  46. // 要启动的线程数组,以及每个线程要执行的任务列表   
  47. List[] taskListPerThread = new List[actualThreadCount];   
  48. int taskIndex = 0;   
  49. //平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与 remainTaskCount   
  50. //相同的变量,不然会在执行中改变 remainTaskCount 原有值,产生麻烦   
  51. int remainIndces = remainTaskCount;   
  52. for (int i = 0; i < taskListPerThread.length; i++) {   
  53. taskListPerThread[i] = new ArrayList();   
  54. // 如果大于零,线程要分配到基本的任务   
  55. if (minTaskCount > 0) {   
  56. for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {   
  57. taskListPerThread[i].add(taskList.get(j));   
  58. }   
  59. taskIndex += minTaskCount;   
  60. }   
  61. // 假如还有剩下的,则补一个到这个线程中   
  62. if (remainIndces > 0) {   
  63. taskListPerThread[i].add(taskList.get(taskIndex++));   
  64. remainIndces--;   
  65. }   
  66. }   
  67. // 打印任务的分配情况   
  68. for (int i = 0; i < taskListPerThread.length; i++) {   
  69. System.out.println("线程 " + i + " 的任务数:" +    
  70.  
  71. taskListPerThread[i].size() + " 区间["   
  72. + taskListPerThread[i].get(0).getTaskId() + ","   
  73. + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]");   
  74. }   
  75. return taskListPerThread;   
  76. }   
  77. }   
  78. /**   
  79. * 要执行的任务,可在执行时改变它的某个状态或调用它的某个操作   
  80. * 例如任务有三个状态,就绪,运行,完成,默认为就绪态   
  81. * 要进一步完善,可为 Task 加上状态变迁的监听器,因之决定UI的显示   
  82. */   
  83. class Task {   
  84. public static final int READY = 0;   
  85. public static final int RUNNING = 1;   
  86. public static final int FINISHED = 2;   
  87. private int status;   
  88. //声明一个任务的自有业务含义的变量,用于标识任务   
  89. private int taskId;   
  90. //任务的初始化方法   
  91. public Task(int taskId){   
  92. this.status = READY;   
  93. this.taskId = taskId;   
  94. }   
  95. /**   
  96. * 执行任务   
  97. */   
  98. public void execute() {   
  99. // 设置状态为运行中   
  100. setStatus(Task.RUNNING);   
  101. System.out.println("当前线程 ID 是:" + Thread.currentThread().getName()   
  102. +" | 任务 ID 是:"+this.taskId);   
  103. // 附加一个延时   
  104. try {   
  105. Thread.sleep(1000);   
  106. catch (InterruptedException e) {   
  107. e.printStackTrace();   
  108. }   
  109. // 执行完成,改状态为完成   
  110. setStatus(FINISHED);   
  111. }   
  112. public void setStatus(int status) {   
  113. this.status = status;   
  114. }   
  115. public int getTaskId() {   
  116. return taskId;   
  117. }   
  118. }   
  119. /**   
  120. * 自定义的工作线程,持有分派给它执行的任务列表   
  121. */   
  122. class WorkThread extends Thread {   
  123. //本线程待执行的任务列表,你也可以指为任务索引的起始值   
  124. private List taskList = null;   
  125. private int threadId;   
  126. /**   
  127. * 构造工作线程,为其指派任务列表,及命名线程 ID   
  128. * @param taskList 欲执行的任务列表   
  129. * @param threadId 线程 ID   
  130. */   
  131. public WorkThread(List taskList,int threadId) {   
  132. this.taskList = taskList;   
  133. this.threadId = threadId;   
  134. }   
  135. /**   
  136. * 执行被指派的所有任务   
  137. */   
  138. public void run() {   
  139. for (Task task : taskList) {   
  140. task.execute();   
  141. }   
  142. }   

执行结果如下,注意观察每个Java多线程分配到的任务数量及区间。直到所有的线程完成了所分配到的任务后程序结束:

  1. 线程 0 的任务数:22 区间[0,21]   
  2. 线程 1 的任务数:22 区间[22,43]   
  3. 线程 2 的任务数:22 区间[44,65]   
  4. 线程 3 的任务数:21 区间[66,86]   
  5. 线程 4 的任务数:21 区间[87,107]   
  6. 实际要启动的工作线程数:5   
  7. 当前线程 ID 是:Thread-0 | 任务 ID 是:0   
  8. 当前线程 ID 是:Thread-1 | 任务 ID 是:22   
  9. 当前线程 ID 是:Thread-2 | 任务 ID 是:44   
  10. 当前线程 ID 是:Thread-3 | 任务 ID 是:66   
  11. 当前线程 ID 是:Thread-4 | 任务 ID 是:87   
  12. 当前线程 ID 是:Thread-0 | 任务 ID 是:1   
  13. 当前线程 ID 是:Thread-1 | 任务 ID 是:23   
  14. 当前线程 ID 是:Thread-2 | 任务 ID 是:45 

上面坦白来只算是基本功夫,贴出来还真见笑了。还有更为复杂的功能.

像Java多线程的下载工具的确更充分利用了网络资源,而且像 FlashGet、NetAnts 都实现了:假如某个线程下载完了欲先所分配段的内容之后,会帮其他线程下载未完成数据,直到任务完成;或某一下载线程的未完成段区间已经很小了,用不着别人来帮忙时,这就涉及到任务的进一步分配。再如,以上两个工具都能动态增加、减小或中止线程,越说越复杂了,它们原本比这复杂多了,这些实现可能定义各种队列来实现,如未完成任务队列、下载中任务队列和已完成队列等。

分享到:
评论

相关推荐

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    消息分发框架,基于java阻塞队列实现,生产者消费者模型 可用于任务分发,服务器消息消息,以及网络IO 性能优化,多线程

    把多个任务分派给多个线程去执行

    由三个类实现,写在了一个 Java 文件中:TaskDistributor 为任务分发器,Task 为待执行的任务,WorkThread 为自定的工作线程。代码中运用了命令模式,如若能配以监听器,用上观察者模式来控制 UI 显示就更绝妙不过了...

    CsqDownloadManager:一个支持多任务、任务多线程、断点续传的下载管理器,适用于较大文件下载

    CsqDownloadManager概述一个支持多任务、任务多线程、断点续传的下载管理器,适用于较大文件下载。minSdkVersion 9。特点:1、支持多个下载任务(默认最多3个)同时下载,如果任务支持断点续传,可以配置此任务多...

    [Java]三分钟入门多线程

    在多处理机系统中情况确实如上所属,所有的任务在同一时刻分发给不同的cpu执行,这确实可以大幅度提高程序的执行效率。 但是在单处理机系统中我们却要换一个角度考虑并发。单处理机系统只有一个cpu,必然不能同时...

    iOS启动框架,支持启动生命周期分发,启动任务分布式注册,同时,统计各启动任务消耗的时间,暴露给外部使用。.zip

    iOS启动框架,支持启动生命周期分发,启动任务分布式注册,启动任务依赖,支持设置启动任务在各生命周期的优先级,执行所在线程以及对其他启动任务的依赖。根据启动任务的依赖关系,检测循环依赖,并发执行,提升...

    Java并发编程(学习笔记).xmind

    (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 (2)建模简单:通过使用线程可以讲复杂并且异步的工作流进一步分解成一组简单并且同步的工作流,每个工作流在一个单独的线程...

    d-concurrent:通过编写多线程的方式来编写分布式程序

    模仿 concurrent 多线程处理方式,进行分布式调用的 java 类库。使用该类库您可以轻松的设计分布式应用,比如 高可用异地容错应用、多任务分发应用... 一、使用场景 1.1、 高可用异地容错应用 举例:有这样的需求,...

    基于opencv和多线程的别踩白块儿游戏辅助.zip

    多语言支持:尽管OpenCV主要使用C++编写,但它提供了丰富的API绑定,支持包括C、Python、Java、MATLAB、JavaScript等多种编程语言,方便不同领域的开发者使用。 开源与免费:OpenCV遵循BSD开源许可证发布,用户...

    基于pyqt5和opencv的多线程图像(视频)处理.zip

    多语言支持:尽管OpenCV主要使用C++编写,但它提供了丰富的API绑定,支持包括C、Python、Java、MATLAB、JavaScript等多种编程语言,方便不同领域的开发者使用。 开源与免费:OpenCV遵循BSD开源许可证发布,用户...

    ZMQ/ZeroMQ使用手册

    发布-订阅、任务分发、请求-应答等。ZMQ 的快速足以胜任集群应用产品。它 的异步 I/O 机制让你能够构建多核应用程序,完成异步消息处理任务。ZMQ 有 着多语言支持,并能在几乎所有的操作系统上运行。ZMQ 是 iMatix ...

    java微信公众号MVC开发框架

    jwx是开源的java公众号开发MVC框架,基于spring配置文件和微信消息或事件注解,通过微信上下文处理一个或多个微信公众号服务请求。目的主要有两个,其一生封装微信请求xml消息为java实体对象,将返回对象转换为xml...

    Vert.x应用开发实例教程

    一个Vert.x有一个或多个事件循环线程组成,线程最大数量为主机有效的CPU核数。 Event Loop Vertical:事件的业务处理线程,存在于Event Loop中,用于处理非阻塞短任务。 Worker Vertical : 事件的业务处理线程,用于...

    javaSE代码实例

    第16章 多线程——Java中的并发协作 343 16.1 线程的基本知识 343 16.1.1 多线程编程的意义 343 16.1.2 定义自己的线程 344 16.1.3 创建线程对象 345 16.1.4 启动线程 347 16.1.5 同时使用多个线程 ...

    netty面试专题-答案-一起学习

    NIO 的特点:事件驱动模型、单线程处理多任务、非阻塞 I/O,I/O 读写不再阻塞,而是返 回 0、基于 block 的传输比基于流的传输更高效、更高级的 IO 函数 zero-copy、IO 多路复用 大大提高了 Java 网络应用的可伸缩性...

    Mobli_SMS_Service

    注释: 请确保您的 MessageBrokerImpl 是线程安全的,因为多个生产者和消费者可以同时使用代理字数不应该区分大小写###Part II – 日志记录 您的团队领导要求您使用通用日志记录框架(例如 log4j)向应用程序添加...

    Linux环境数据库管理员指南

    1.3.2 多任务 4 1.4 为什么选择 Linux 6 1.4.1 何时使用 Linux 6 1.4.2 服务器与工作站 6 1.4.3 推荐的硬件 7 1.4.4 移植到 Linux工作站 7 1.5 Linux分发包 8 1.6 升级或移植前的考虑 10 1.6.1 硬件兼容性 11 1.6.2 ...

    springCloud

    消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理...

    LiveEventBus:EventBus for Android,消息总线,基于LiveData,具有生命周期感知能力,支持Sticky,支持AndroidX,支持跨进程,支持跨APP

    LiveEventBus LiveEventBus是一款Android...线程分发 EventBus :cross_mark: :check_mark_button: :check_mark_button: :cross_mark: :cross_mark: :check_mark_button: RxBus :cross_mark: :cross_mark: :check_mar

    精通websphere MQ

    分发列表 (Distribution List) ................................................................ 26 1.2.7 进程定义 (Process)................................................................................

Global site tag (gtag.js) - Google Analytics