监控服务程序调度算法实现
package com.wole.monitor;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.Set;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.PriorityBlockingQueue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicLong;import org.eclipse.jetty.util.ConcurrentHashSet;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.wole.monitor.dao.ServiceDao;import com.wole.servicemonitor.util.ServiceUtils;/** * 管理并调度某一个服务数据源的监控池 * @author yzygenuine * */public class MonitorsManage {private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);private ServiceDao dao;/** * 执行的一个并发池 */private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());/** * */private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);/** * 正在执行中的MonitorService集合 */private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();/** * 等待优先级队列 */private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();/** * 执行队列 */private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();/** * 是否关闭 */private AtomicBoolean isClose = new AtomicBoolean(false);/** * 生产者启动时间 */private AtomicLong startTime = new AtomicLong(0);/** * 相对于启动的间隔时间 */private AtomicLong intervalTime = new AtomicLong(0);public void close() {logger.info("closing................");isClose.compareAndSet(false, true);}public void init() {logger.info("初始化");}public void work() {logger.info("开始工作");// 生产者启动工作Thread productThread = new Thread(new ProductMonitor(1000));// 消费者启动工作Thread consumerThread = new Thread(new ConsumerMonitor(1000));// 回收者启动工作Thread recoverThread = new Thread(new RecoverMonitor(1000));// 启动定时加载数据工作Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));productThread.start();consumerThread.start();recoverThread.start();refreshThread.start();}/** * 生产者 * * @author yzygenuine * */class ProductMonitor implements Runnable {long sleepTime = 1000;public ProductMonitor(long sleepTime) {this.sleepTime = sleepTime;}@Overridepublic void run() {logger.info("生产者开启工作");// 开始进行定时监控long now = System.currentTimeMillis();long lastTime = now;startTime.addAndGet(now);try {do {Thread.sleep(sleepTime);logger.debug("生产者休息{}ms", sleepTime);now = System.currentTimeMillis();intervalTime.addAndGet(now - lastTime);while (sleepQueue.size() > 0) {MonitorService service = sleepQueue.peek();if (service.getCurrentTime() - intervalTime.get() < 1) {service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列if (!currentSet.contains(service)) {logger.info("service {} 已被删除,不加入执行队列了", service.toString());continue;}executeQueue.add(service);} else {logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get());break;}}if (sleepQueue.size() <= 0) {logger.debug("生产队列为空");}lastTime = now;} while (!isClose.get());} catch (Exception e) {logger.error("", e);}}}/** * 消费者 * * @author yzygenuine * */class ConsumerMonitor implements Runnable {long sleepTime = 1000;public ConsumerMonitor(long sleepTime) {this.sleepTime = sleepTime;if (sleepTime < 1000) {throw new RuntimeException("请配置sleepTime值大一些");}}@Overridepublic void run() {logger.info("消费者开启工作");try {do {Thread.sleep(sleepTime);logger.debug("消费者休息{}ms", sleepTime);while (executeQueue.size() > 0) {final MonitorService service = executeQueue.poll();completionService.submit(new ExecuteCallable(service));}logger.debug("消费队列为空");} while (!isClose.get());} catch (Exception e) {logger.error("", e);}}}/** * 执行回调类 * * @author yzygenuine * */class ExecuteCallable implements Callable<Response> {final MonitorService service;public ExecuteCallable(MonitorService service) {this.service = service;}@Overridepublic Response call() throws Exception {logger.debug("执行");Map<String, String> r = new HashMap<String, String>();Response response = new Response();response.service = service;response.response = r;Monitor m = MonitorFactory.getMonitor(service);response.isNeedWarn = m.isNeedWarnging(service, r);if (response.isNeedWarn) {response.isSucToNotify = m.sendNotify(service, r);}return response;}}/** * 回收者 * * @author yzygenuine * */class RecoverMonitor implements Runnable {private long sleepTime = 1000;private long count = 0;public RecoverMonitor(long sleepTime) {this.sleepTime = sleepTime;if (sleepTime < 1000) {throw new RuntimeException("请配置sleepTime值大一些");}}@Overridepublic void run() {logger.info("回收者开启工作");try {do {// Thread.sleep(sleepTime);Future<Response> response = completionService.take();// 重置后进入休眠队列MonitorService s = response.get().service;if (!currentSet.contains(s)) {logger.info("service {} 已被删除,不回收了", s.toString());continue;}// 当前程序已运动的时间+相对间隔时间=绝对的间隔时间s.setCurrentTime(s.getIntervalTime() + intervalTime.get());sleepQueue.add(s);count++;logger.info("回收,当前回收数量:" + count);} while (!isClose.get());} catch (Exception e) {logger.error("", e);}}}/** * 加载新的数据 * * @author yzygenuine * */class RefreshMonitorService implements Runnable {private long sleepTime = 1000;private ServiceDao dao;public RefreshMonitorService(long sleepTime, ServiceDao dao) {this.sleepTime = sleepTime;if (sleepTime < 60000) {logger.warn("刷新加载数据的间隔时间不能太短");throw new RuntimeException("刷新加载数据的间隔时间不能太短");}this.dao = dao;}private void firstLoad() {List<MonitorService> monitorService = dao.getService();logger.info("加载记录:" + monitorService.size());// 将被监控服务加入优先级队列里for (int j = 0; j < monitorService.size(); j++) {MonitorService service = monitorService.get(j);// 初始化好时间service.setCurrentTime(service.getIntervalTime() + intervalTime.get());currentSet.add(service);sleepQueue.add(service);}}@Overridepublic void run() {logger.info("读取新的service开启工作");firstLoad();try {do {logger.info("定时加载新的数据监听者休息{}ms", sleepTime);Thread.sleep(sleepTime);logger.info("##########开始执行更新数据############");// 加载新的所有所数据 ,与当前的数据比较List<MonitorService> deleteList = dao.deleteService();List<MonitorService> addList = dao.incrementalService();logger.info("删除旧的数据共:{}", deleteList.size());currentSet.removeAll(deleteList);logger.info("增加新的数据共:{}", addList.size());currentSet.addAll(addList);logger.info("更新后的currentSet size:{}", currentSet.size());for (MonitorService service : addList) {// 初始化绝对间隔时间service.setCurrentTime(service.getIntervalTime() + intervalTime.get());sleepQueue.add(service);}logger.info("########这一轮更新结束");} while (!isClose.get());} catch (Exception e) {logger.error("", e);}}}/** * 响应的封装类 * * @author yzygenuine * */class Response {public Map<String, String> response;public MonitorService service;public boolean isNeedWarn;public boolean isSucToNotify;}public void setDao(ServiceDao dao) {this.dao = dao;}}