有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.
其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:
1) "相对平均"怎么设计
2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..
3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.
例如:
sid1: w1 w2 w3 w4
sid2: w5
sid3:w6
期望迁移之后:
sid1:w1 w2
sid2:w5 w3
sid3:w4 w6
而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)
sid1:w6 w5
sid2:w2 w3
sid3:w1 w4
经过提取,"相对平均"的设计代码如下,仅作备忘:
package com.test.demo.zookeeper; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.*; public class WorkersBalanceMain { private List<String> servers = new ArrayList<String>(); private Map<String, List<String>> current = new HashMap<String, List<String>>(); private Set<String> workers = new HashSet<String>(); public static void main(String[] args) { BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String line; Set<String> servers = new HashSet<String>(); WorkersBalanceMain balancer = new WorkersBalanceMain(); try { while ((line = br.readLine()) != null) { if (line.startsWith("addWorker")) { balancer.addWorkers(line); } else if (line.startsWith("addServer")) { balancer.addServers(line); } else { System.out.println("???"); continue; } balancer.rebalance(); } } catch (Exception e) { e.printStackTrace(); } System.out.println("--END---"); } public void addServers(String source) { int index = source.indexOf(" "); if (index == -1) { return; } String[] values = source.substring(index + 1).split(" "); if (values == null || values.length == 0) { return; } for (String server : values) { servers.add(server); if(current.get(server) == null){ current.put(server,new ArrayList<String>()); } } } public void addWorkers(String source) { int index = source.indexOf(" "); if (index == -1) { return; } String[] values = source.substring(index + 1).split(" "); if (values == null || values.length == 0) { return; } //当有新的worker提交时,将咱有一台机器接管 String sid = servers.get(0); List<String> sw = current.get(sid); if(sw == null){ current.put(sid,new ArrayList<String>()); } for (String worker : values) { workers.add(worker); sw.add(worker); } } public void rebalance() { try { if (workers.isEmpty()) { return; } for (String sid : servers) { if (current.get(sid) == null) { current.put(sid, new ArrayList<String>()); } } //根据每个sid上的worker个数,整理成一个排序的map TreeMap<Integer, List<String>> counterMap = new TreeMap<Integer, List<String>>(); for (Map.Entry<String, List<String>> entry : current.entrySet()) { int total = entry.getValue().size(); List<String> sl = counterMap.get(total); if (sl == null) { sl = new ArrayList<String>(); counterMap.put(total, sl); } sl.add(entry.getKey());//sid } int totalWorkers = workers.size(); int totalServers = current.keySet().size(); int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数 while (true) { Map.Entry<Integer, List<String>> gt = counterMap.higherEntry(avg); //大于平均数的列表, >= avg + 1 Map.Entry<Integer, List<String>> lt = counterMap.lowerEntry(avg); //与平均数差值为2的 <= arg - 1 //允许任务个数与avg上线浮动1各个,不是绝对的平均 if (gt == null || lt == null) { break; } Integer gtKey = gt.getKey(); Integer ltKey = lt.getKey(); if (gtKey - ltKey < 2) { break; } if (gt.getValue().size() == 0) { counterMap.remove(gt.getKey()); } if (lt.getValue().size() == 0) { counterMap.remove(lt.getKey()); } Iterator<String> it = gt.getValue().iterator(); //sid列表 while (it.hasNext()) { String _fromSid = it.next(); List<String> _currentWorkers = current.get(_fromSid); if (_currentWorkers == null || _currentWorkers.isEmpty()) { it.remove(); current.remove(_fromSid); continue; } List<String> _ltServers = lt.getValue(); if (_ltServers.isEmpty()) { counterMap.remove(ltKey); break; } //取出需要交换出去的任务id int _currentWorkersSize = _currentWorkers.size(); String _wid = _currentWorkers.get(_currentWorkersSize - 1); String _toSid = _ltServers.get(0); //从_fromSid的worker列表中移除低workerId //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部 //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数" _currentWorkers.remove(_currentWorkersSize - 1); it.remove(); _ltServers.remove(0); //将此workerId添加到_toSid的worker列表中 List<String> _ltWorkers = current.get(_toSid); if (_ltWorkers == null) { _ltWorkers = new ArrayList<String>(); current.put(_toSid, _ltWorkers); } _ltWorkers.add(_wid); //将gt的key降低一个数字 List<String> _next = counterMap.get(gtKey - 1); if (_next == null) { _next = new ArrayList<String>(); counterMap.put(gtKey - 1, _next); } _next.add(_fromSid); //将lt的key提升一个数字 List<String> _prev = counterMap.get(ltKey + 1); //从lt的countMap中移除,因为它将被放置在key + 1的新位置 Iterator<String> _ltIt = _ltServers.iterator(); while (_ltIt.hasNext()) { if (_ltIt.next().equalsIgnoreCase(_toSid)) { _ltIt.remove(); break; } } if (_prev == null) { _prev = new ArrayList<String>(); counterMap.put(ltKey + 1, _prev); } _prev.add(_toSid); } } //dump info for (Map.Entry<String, List<String>> entry : current.entrySet()) { System.out.println("Sid:" + entry.getKey()); System.out.println(entry.getValue().toString()); } } catch (Exception e) { e.printStackTrace(); } } }
相关推荐
卡口摄像机 圆形分布算法 根据一个中心点和半径输入摄像机数进行圆形显示
这是一个用C++做备忘录算法,备忘录方法是动态规划方法的变形。与动态规划算法不同的是,备忘录方法的递归方式是自顶向下的,而动态规划算法则是自底向上的
讲解了分布估计算法的基本原理,附带一个基于matlab实现的例子代码
STL算法备忘单+来自STL算法视频系列的示例代码_C++_下载.zip
分布估计算法PPT,讲解了分布估计算法的基本原理
该资源详细地给出了分布估计算法的PBIL算法的具体流程。供学习分布估计算的人参考使用。
使用C++实现的一个简单的分布估计算法,可实现多峰函数的最优化
代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序...
分布估计算法综述
【智能优化算法】基于广义正态分布优化算法求解单目标优化问题附matlab代码.zip
分布估计算法是目前研究比较热的一种算法,分布估计算法工具箱中含有各种算法模型:单变量,多变量以及贝叶斯网络,贝叶斯树等模型的学习,抽样。
计算智能,pyhton:粒子群算法和分布估计算法,代码有注释
毫升备忘单我写的一些机器学习算法备忘单可能有错误
简单地介绍了分布估计算法 并以两个比较简单的分布估计算法为例
STM32Cubemx配置ADC多通道DMA转换,平均滤波算法,将结果显示在OLED屏幕
相对定位算法无需事先布置信标节点的特点适用于对节点硬件成本和能耗,以及节点分布等方面有限制的无线传感器网络。综合分析了现有典型的相对定位算法,通过对算法在通信量、定位覆盖率和定位精度三个方面进行综合...
matlab程序,PBIL 分布估计 EDA算法
它从全部空闲区中找出能满足作业要求的、且大小最小的空闲分区,这种方法能使碎片尽量小。为适应此算法,空闲分区表(空闲区链)中的空闲分区要按从小到大进行排序,自表头开始查找到第一个满足要求的自由..
TIA博途SCL语言_滑动平均值滤波算法_FB库文件