"相对平均"分布算法备忘
package com.test.demo.zookeeper;import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.*;public class WorkersBalanceMain { private List<String> servers = new ArrayList<String>(); private Map<String, List<String>> current = new HashMap<String, List<String>>(); private Set<String> workers = new HashSet<String>(); public static void main(String[] args) { BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String line; Set<String> servers = new HashSet<String>(); WorkersBalanceMain balancer = new WorkersBalanceMain(); try { while ((line = br.readLine()) != null) { if (line.startsWith("addWorker")) { balancer.addWorkers(line); } else if (line.startsWith("addServer")) { balancer.addServers(line); } else { System.out.println("???"); continue; } balancer.rebalance(); } } catch (Exception e) { e.printStackTrace(); } System.out.println("--END---"); } public void addServers(String source) { int index = source.indexOf(" "); if (index == -1) { return; } String[] values = source.substring(index + 1).split(" "); if (values == null || values.length == 0) { return; } for (String server : values) { servers.add(server); if(current.get(server) == null){ current.put(server,new ArrayList<String>()); } } } public void addWorkers(String source) { int index = source.indexOf(" "); if (index == -1) { return; } String[] values = source.substring(index + 1).split(" "); if (values == null || values.length == 0) { return; } //当有新的worker提交时,将咱有一台机器接管 String sid = servers.get(0); List<String> sw = current.get(sid); if(sw == null){ current.put(sid,new ArrayList<String>()); } for (String worker : values) { workers.add(worker); sw.add(worker); } } public void rebalance() { try { if (workers.isEmpty()) { return; } for (String sid : servers) { if (current.get(sid) == null) { current.put(sid, new ArrayList<String>()); } } //根据每个sid上的worker个数,整理成一个排序的map TreeMap<Integer, List<String>> counterMap = new TreeMap<Integer, List<String>>(); for (Map.Entry<String, List<String>> entry : current.entrySet()) { int total = entry.getValue().size(); List<String> sl = counterMap.get(total); if (sl == null) { sl = new ArrayList<String>(); counterMap.put(total, sl); } sl.add(entry.getKey());//sid } int totalWorkers = workers.size(); int totalServers = current.keySet().size(); int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数 while (true) { Map.Entry<Integer, List<String>> gt = counterMap.higherEntry(avg); //大于平均数的列表, >= avg + 1 Map.Entry<Integer, List<String>> lt = counterMap.lowerEntry(avg); //与平均数差值为2的 <= arg - 1 //允许任务个数与avg上线浮动1各个,不是绝对的平均 if (gt == null || lt == null) { break; } Integer gtKey = gt.getKey(); Integer ltKey = lt.getKey(); if (gt.getKey() - lt.getKey() < 2) { break; } if (gt.getValue().size() == 0) { counterMap.remove(gt.getKey()); } if (lt.getValue().size() == 0) { counterMap.remove(lt.getKey()); } Iterator<String> it = gt.getValue().iterator(); //sid列表 while (it.hasNext()) { String _fromSid = it.next(); List<String> _currentWorkers = current.get(_fromSid); if (_currentWorkers == null || _currentWorkers.isEmpty()) { it.remove(); current.remove(_fromSid); continue; } List<String> _ltServers = lt.getValue(); if (_ltServers.isEmpty()) { counterMap.remove(ltKey); break; } //取出需要交换出去的任务id int _currentWorkersSize = _currentWorkers.size(); String _wid = _currentWorkers.get(_currentWorkersSize - 1); String _toSid = _ltServers.get(0); //从_fromSid的worker列表中移除低workerId //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部 //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数" _currentWorkers.remove(_currentWorkersSize - 1); it.remove(); _ltServers.remove(0); //将此workerId添加到_toSid的worker列表中 List<String> _ltWorkers = current.get(_toSid); if (_ltWorkers == null) { _ltWorkers = new ArrayList<String>(); current.put(_toSid, _ltWorkers); } _ltWorkers.add(_wid); //将gt的key降低一个数字 List<String> _next = counterMap.get(gtKey - 1); if (_next == null) { _next = new ArrayList<String>(); counterMap.put(gtKey - 1, _next); } _next.add(_fromSid); //将lt的key提升一个数字 List<String> _prev = counterMap.get(ltKey + 1); //从lt的countMap中移除,因为它将被放置在key + 1的新位置 Iterator<String> _ltIt = _ltServers.iterator(); while (_ltIt.hasNext()) { if (_ltIt.next().equalsIgnoreCase(_toSid)) { _ltIt.remove(); break; } } if (_prev == null) { _prev = new ArrayList<String>(); counterMap.put(ltKey + 1, _prev); } _prev.add(_toSid); } } //dump info for (Map.Entry<String, List<String>> entry : current.entrySet()) { System.out.println("Sid:" + entry.getKey()); System.out.println(entry.getValue().toString()); } } catch (Exception e) { e.printStackTrace(); } }}
?