首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

任务调度的简略实现

2012-11-12 
任务调度的简单实现做了个简单的任务调度实例,主要思想功能如下:模拟master和slave2台机器,master机器有一

任务调度的简单实现

做了个简单的任务调度实例,主要思想功能如下:
模拟master和slave2台机器,master机器有一个任务队列,通过启动一个线程定时轮询改任务队列,如果其中有任务,则调用slave的接口将任务分配给slave机器处理。同时,slave机器配置了心跳检测,即slave启动一个线程,每隔一段时间调用master的接口修改master的全局变量lasterAliveTime的值为当前时间,同时,master也有一个线程用来定时计数当前时间与这个lasterAliveTime的时间差,如果超过2秒则认为slave机器死亡。
实现代码如下:


任务调度的简略实现

Master包下为master机器运行的类,Slave包下为slave机器运行的类。

模拟2个服务的方法如下;

1.模拟master服务:

在pom.xml里使用jetty插件,并设定端口为8081,如下:

?

<plugins>

<!-- jetty插件 -->

<plugin>

<groupId>org.mortbay.jetty</groupId>

<artifactId>maven-jetty-plugin</artifactId>

<version>6.1.22</version>

<configuration>

<contextPath>/</contextPath>

<connectors>

<connector implementation="org.mortbay.jetty.nio.SelectChannelConnector">

<port>8081</port>

<maxIdleTime>60000</maxIdleTime>

</connector>

</connectors>

<requestLog implementation="org.mortbay.jetty.NCSARequestLog">

<filename>target/access.log</filename>

<retainDays>90</retainDays>

<append>false</append>

<extended>false</extended>

<logTimeZone>GMT+8:00</logTimeZone>

</requestLog>

<systemProperties>

<systemProperty>

<name>productionMode</name>

<value>false</value>

</systemProperty>

</systemProperties>

</configuration>

</plugin>

<plugins>

?

本实例通过实现ServletContextListener接口实现服务启动后自动启动相关类功能,所以配置web.xml如下:

?

<?xml version="1.0" encoding="GB2312"?>

<web-app id="WebApp_ID" version="2.4"

xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">

?

<servlet>

<servlet-name>test</servlet-name>

<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

<load-on-startup>1</load-on-startup>

</servlet>

?

<servlet-mapping>

<servlet-name>test</servlet-name>

<url-pattern>/service/*</url-pattern>

</servlet-mapping>

?

?

<context-param>

<param-name>contextConfigLocation</param-name>

<param-value>

? ?classpath*:/spring/*.xml</param-value>

</context-param>

?

<listener>

<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>

</listener>

?

<!--随着服务启动自动运行-->

<listener>

? ? ? ? ? ? ? ?<listener-class>com.TaskScheduling.Master.AutoRun</listener-class>

? ? ? ? </listener>

?

</web-app>

运行mvn jetty:run命令启动master服务。
2.模拟slave机器除了web.xml文件外其他代码无需改动,web.xml需修改的配置如下:

? ? ? ? <listener>

? ? ? ? ? ? ? ?<listener-class>com.TaskScheduling.Slave.SlaveAutoRun</listener-class>

? ? ? ? </listener>

?

运行mvn clean install ,将生成的war包放到jetty的%jetty_home%/webapps路径下,jetty默认的端口为8080,启动jetty,在%jetty_home%下执行命令:java -jar start.jar。

?

?

相关代码如下:

web.xml:

?

<?xml version="1.0" encoding="GB2312"?>

<web-app id="WebApp_ID" version="2.4"

xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">

?

<servlet>

<servlet-name>test</servlet-name>

<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

<load-on-startup>1</load-on-startup>

</servlet>

?

<servlet-mapping>

<servlet-name>test</servlet-name>

<url-pattern>/service/*</url-pattern>

</servlet-mapping>

?

?

<context-param>

<param-name>contextConfigLocation</param-name>

<param-value>

? ?classpath*:/spring/*.xml</param-value>

</context-param>

?

<listener>

<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>

</listener>

?

? ? ? ? <welcome-file-list>

<welcome-file>/jsp/login.jsp</welcome-file>

</welcome-file-list>

<listener>

? ? <listener-class>com.TaskScheduling.Master.AutoRun</listener-class>

<!-- ? ?<listener-class>com.TaskScheduling.Slave.SlaveAutoRun</listener-class>-->

? ? </listener>

?

?

?

</web-app>

?

在%classpath%/spring/autorun.xml:

?

?

<?xml version="1.0" encoding="utf-8"?>

?

<beans default-autowire="byName"

xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:util="http://www.springframework.org/schema/util"

xsi:schemaLocation="http://www.springframework.org/schema/beans?

http://www.springframework.org/schema/beans/spring-beans-2.0.xsd?

http://www.springframework.org/schema/aop?

http://www.springframework.org/schema/aop/spring-aop-2.0.xsd?

http://www.springframework.org/schema/util?

http://www.springframework.org/schema/util/spring-util-2.5.xsd ">

?

?

<bean id="jobService" />

<bean id="slaveHeartBeat" />

<bean id="heartBeat" />

?

? ? ? ? <bean id="testService" />

?

<bean name="/HeatBeat"

ref="slaveHeartBeat" />

<property name="serviceInterface" value="com.TaskScheduling.Master.SlaveHeartBeat" />

</bean>

?

<bean name="/JobProcess"

ref="jobProcess" />

<property name="serviceInterface" value="com.TaskScheduling.Slave.JobProcess" />

</bean>

?

<bean id="jobProcess" />

?

</beans>

?

master相关类:

com.TaskScheduling.Master.SlaveHeartBeat:

?

package com.TaskScheduling.Master;

?

public interface SlaveHeartBeat {

public String check() ;

}


com.TaskScheduling.Master.impl.SlaveHeartBeatImpl:package com.TaskScheduling.Master.impl;
import java.util.Date;
import com.TaskScheduling.Master.MasterGlobalResource;import com.TaskScheduling.Master.SlaveHeartBeat;
public class SlaveHeartBeatImpl implements SlaveHeartBeat {/** * slave定期调用master的该接口修改master的全局变量MasterGlobalResource的Lastalivetime值 */public String check() {MasterGlobalResource.setLastalivetime(new Date());System.out.println(MasterGlobalResource.getLastalivetime());return "OK";}}
com.TaskScheduling.Master.AutoRun:package com.TaskScheduling.Master;
import java.io.File;import java.io.FileWriter;import java.io.IOException;
import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import javax.servlet.http.HttpServlet;
import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;

@SuppressWarnings("serial")public class AutoRun extends HttpServlet implements ServletContextListener {/**?* 服务器停止前自动执行?*/@Overridepublic void contextDestroyed(ServletContextEvent arg0) { ?DeleteFile();}/** * 服务器启动后自动运行 */@Overridepublic void contextInitialized(ServletContextEvent arg0) { WriteFile();/*也可以通过?ApplicationContext?app = WebApplicationContextUtils.getWebApplicationContext(arg0.getServletContext()); 获取spring上下文,但不可以通过让AutoRun 类实现ApplicationContextAware接口来获取上下文,应为autorun类实现了ServletContextListener,所以在spring启动前启动,所以得到的上下文将为null*/? ? ? ? ? ? ? ? ?ApplicationContext app = new ClassPathXmlApplicationContext("classpath*:spring/*.xml"); //任务队列轮询 MasterGlobalResource.getExecutor().execute((JobService)app.getBean("jobService")); //检测slave机器是否存活 MasterGlobalResource.getExecutor().execute((HeartBeatCheck)app.getBean("heartBeatCheck"));} ?public void WriteFile() { ? ? ? ?try { ? ? ? ?System.out.println("write"); ? ? ? ? ? ?FileWriter fw = new FileWriter("c:/WriteData.txt"); ? ? ? ? ? ?// 将字符串写入文件 ? ? ? ? ? ?fw.write("Hello World!"); ? ? ? ? ? ?fw.write("Hello Everyone!"); ? ? ? ? ? ?fw.close(); ? ? ? ?} catch (IOException e) { ? ? ? ?} ? ?}
? ?public void DeleteFile() { ? ? ? ?File f = new File("c:/WriteData.txt"); ? ? ? ?// 检查文件是否存在,如果存在,直接删除文件 ? ? ? ?if (f.exists()) { ? ? ? ? ? ?f.delete(); ? ? ? ?} ? ?}
}
com.TaskScheduling.Master.HeartBeatCheck:package com.TaskScheduling.Master;
import java.util.Date;
public class HeartBeatCheck implements Runnable{@Overridepublic void run() {while(true){Date lastAliveTime = MasterGlobalResource.getLastalivetime();Date nowTime = new Date();System.out.println("nowTime="+nowTime.getTime()+" ? lastAliveTime="+lastAliveTime.getTime());//如果超过2秒slave机器没有调用master机器的接口修改Lastalivetime值则认为slave死亡if(nowTime.getTime()-lastAliveTime.getTime()>2000)System.out.println("slave has no feedback in 2 second...");try {//每1秒钟监测一次Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
com.TaskScheduling.Master.JobService:package com.TaskScheduling.Master;
import com.TaskScheduling.Slave.JobProcess;import com.caucho.hessian.client.HessianProxyFactory;/**?* 任务调度?*?*/public class JobService implements Runnable {private HessianProxyFactory factory = new HessianProxyFactory();@Overridepublic void run() {try {//添加10个任务到任务队列Thread.sleep(5000);for(int i=0;i<10;i++){ZephyrJobWrapper job=new ZephyrJobWrapper();job.setJobId(String.valueOf(i));job.setJobName(String.valueOf(i));job.setJobBeanName("testService");//任务bean名job.setJobMethodName("getJobInfo");//任务方法名job.setJobArgs(new Object[]{"hello","xj"+String.valueOf(i)});//方法的参数MasterGlobalResource.setWaitingJobQueue(job);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}//slave处理任务的接口JobProcess jobProcess=(JobProcess)factory.create(JobProcess.class, "http://localhost:8080/rmi.test/service/JobProcess");//轮询任务队列while(true){System.out.println("size="+MasterGlobalResource.getWaitingJobQueue().size());for(ZephyrJobWrapper job:MasterGlobalResource.getWaitingJobQueue()){//如果任务队列有任务则通过hessian接口交给slave机器执行任务jobProcess.process(job);} ? ?//轮询间隔为一秒钟Thread.sleep(1000);}} catch (Exception e) {System.out.println(e);System.out.println("job process failed");}}}
com.TaskScheduling.Master.MasterGlobalResource:package com.TaskScheduling.Master;
import java.util.Date;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;
/**?* master机器的全局变量?*?*/public ?class MasterGlobalResource {/** * slave最后心跳时间 */private ?static Date lastalivetime=new Date();/** * 线程池 */private static ThreadPoolExecutor ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?executor ? ? ? ? ? ? ? ? = new ThreadPoolExecutor(? ? ? ? ? ? ?10,? ? ? ? ? ? ?10,? ? ? ? ? ? ?0L,? ? ? ? ? ? ?TimeUnit.MILLISECONDS,? ? ? ? ? ? ?new LinkedBlockingQueue<Runnable>(),? ? ? ? ? ? ?new ThreadPoolExecutor.DiscardPolicy()); /** ?* 任务队列 ?*/ private static LinkedBlockingQueue<ZephyrJobWrapper> ? ? ? ? ? ? waitingJobQueue ? ? ? ? ?= new LinkedBlockingQueue<ZephyrJobWrapper>();?? public static ThreadPoolExecutor getExecutor() { ? ? ? ?return executor; ? ?}? public static LinkedBlockingQueue<ZephyrJobWrapper> getWaitingJobQueue() { return waitingJobQueue; }? public static void setWaitingJobQueue(ZephyrJobWrapper job) { waitingJobQueue.add(job); }
public static Date getLastalivetime() {return lastalivetime;}
public static void setLastalivetime(Date lastalivetime) {MasterGlobalResource.lastalivetime = lastalivetime;}
}
com.TaskScheduling.Master.SlaveHeartBeat:package com.TaskScheduling.Master;
public interface SlaveHeartBeat {public String check() ;}
com.TaskScheduling.Master.ZephyrJobWrapper:package com.TaskScheduling.Master;
import java.io.Serializable;/**?* job定义类?*?*/public class ZephyrJobWrapper implements Serializable{private static final long serialVersionUID = 6453296403466572073L;public String jobId;public String jobName;public String getJobId() {return jobId;}public void setJobId(String jobId) {this.jobId = jobId;}public String getJobName() {return jobName;}public void setJobName(String jobName) {this.jobName = jobName;}}
slave相关类:com.TaskScheduling.Slave.JobProcess:package com.TaskScheduling.Slave;
import com.TaskScheduling.Master.ZephyrJobWrapper;
public interface JobProcess {public String process(ZephyrJobWrapper job);
}

com.TaskScheduling.Slave.impl.JobProcessImpl:package com.TaskScheduling.Slave.impl;
import java.lang.reflect.Method;
import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;
import com.TaskScheduling.Master.ZephyrJobWrapper;import com.TaskScheduling.Slave.JobProcess;/**?* 处理master分配给slave的任务?*?*/public class JobProcessImpl implements JobProcess,ApplicationContextAware{private ApplicationContext ?applicationContext;
@Overridepublic String process(ZephyrJobWrapper job) {String result="success";Object jobBean = applicationContext.getBean(job.getJobBeanName());? ? ? ? Class<?> clazz = jobBean.getClass();? ? ? ? Method jobMethod = null;? ? ? ? Object[] args = null;? ? ? ? for (Method method : clazz.getMethods()) {? ? ? ? ? ? Class<?>[] paramTypes = method.getParameterTypes();
? ? ? ? ? ? if (method.getName().equals(job.getJobMethodName())) {? ? ? ? ? ? ? ? Object jobArgs = job.getJobArgs();
? ? ? ? ? ? ? ? /** 传入Object[]参数时,检查方法名和参数个数 **/? ? ? ? ? ? ? ? if (jobArgs != null && jobArgs.getClass().getSimpleName().equals("Object[]")) {? ? ? ? ? ? ? ? ? ? args = (Object[]) jobArgs;
? ? ? ? ? ? ? ? ? ? if (args.length == paramTypes.length) {? ? ? ? ? ? ? ? ? ? ? ? jobMethod = method;? ? ? ? ? ? ? ? ? ? ? ? break;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? /** 不传参或者传入其它类型参数时,只检查方法名 **/? ? ? ? ? ? ? ? else {? ? ? ? ? ? ? ? ? ? jobMethod = method;? ? ? ? ? ? ? ? ? ? break;? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? ? ? try {if (jobMethod != null) { ? ?if (args == null) { ? ? ? ?jobMethod.invoke(jobBean, job.getJobArgs()); ? ?} else { ? ? ? ?jobMethod.invoke(jobBean, args); ? ?}} else {result="failed";}} catch (Exception e) {result="failed";}?System.out.println("complete process job "+job.getJobName()+"successful!");return result;}
@Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {this.applicationContext=applicationContext;}
}

com.TaskScheduling.Slave.HeartBeat:package com.TaskScheduling.Slave;

import com.TaskScheduling.Master.SlaveHeartBeat;import com.caucho.hessian.client.HessianProxyFactory;/**?* 每1秒调用master接口修改最后更新时间,用来心跳监测?*?*/public class HeartBeat implements Runnable {private HessianProxyFactory factory ? ? ? ? ? ? ? ?= new HessianProxyFactory();
@Overridepublic void run() {while(true){try {SlaveHeartBeat heatBeat=(SlaveHeartBeat)factory.create(SlaveHeartBeat.class, "http://localhost:8081/service/HeatBeat");String result=heatBeat.check();System.out.println(result);Thread.sleep(1000);} catch (Exception e) {System.out.println("slave cannot connect to master...check master run well");}}}}
com.TaskScheduling.Slave.SlaveAutoRun:package com.TaskScheduling.Slave;
import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import javax.servlet.http.HttpServlet;
import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;
@SuppressWarnings("serial")public class SlaveAutoRun extends HttpServlet implements ServletContextListener {public SlaveGlobalResource slaveGlobalResource;
@Overridepublic void contextDestroyed(ServletContextEvent arg0) {
}
@SuppressWarnings("static-access")@Overridepublic void contextInitialized(ServletContextEvent arg0) {ApplicationContext app=new ClassPathXmlApplicationContext("classpath*:spring/*.xml");//心跳检测slaveGlobalResource.getExecutor().execute((HeartBeat)app.getBean("heartBeat"));}
}
com.TaskScheduling.Slave.SlaveGlobalResource:package com.TaskScheduling.Slave;
import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/**?* slave的全局变量?*?*/public class SlaveGlobalResource {public static ThreadPoolExecutor executor=new ThreadPoolExecutor( 10,? ? ? ? ? ? 10,? ? ? ? ? ? 0L,? ? ? ? ? ? TimeUnit.MILLISECONDS,? ? ? ? ? ? new LinkedBlockingQueue<Runnable>(),? ? ? ? ? ? new ThreadPoolExecutor.DiscardPolicy());?public static ThreadPoolExecutor getExecutor(){return executor;}
}
com.TaskScheduling.Slave.service.TestService:package com.TaskScheduling.Slave.service;/**?* 需要slave处理的任务类?*?*/public class TestService {public void getJobInfo(String str1,String str2){System.out.println(str1+"---------"+str2);return ;}
}

热点排行