线程池示例代码版本2
包结构
src
test
TestThreadPool 测试类
thread
ThreadPool 线程池类
WorkThread 工作线程类
TaskMonitorThread 任务监测线程类
TaskTimeOutThread 任务超时监测线程类
task
TaskManager 任务管理器
WorkTask 任务接口
WorkTaskImp 正常任务类
WorkTaskAImp 异常任务类
WorkTaskBImp 超时任务类
event
AbstractEvent 任务事件类
BeginTaskEvent 任务执行开始事件类
EndTaskEvent 任务执行结束事件类
TaskRunTime 任务运行时间类
TaskTimeOutEvent 任务执行超时事件类
源代码
package test;import task.TaskManager;import task.WorkTask;import task.WorkTaskAImp;import task.WorkTaskBImp;import task.WorkTaskImp;import thread.ThreadPool;/** * 线程池测试类,测试功能如下: * 1、测试线程池创建功能 * 2、测试处理并发请求功能 * 3、测试关闭功能 **/public class TestThreadPool {public static void main(String[] args){//创建线程池,开启处理请求服务final int threadCount=10;ThreadPool pool=ThreadPool.getInstance();pool.init(threadCount);//接收客户端请求WorkTask task1=new WorkTaskBImp("执行超时任务...");TaskManager.addTask(task1);final int requestCount=20;for(int i=0;i<requestCount;i++){WorkTask task=new WorkTaskImp("执行第"+i+"个增加用户操作...");TaskManager.addTask(task);}WorkTask task2=new WorkTaskBImp("执行超时任务...");TaskManager.addTask(task2);for(int i=0;i<requestCount;i++){WorkTask task=new WorkTaskAImp("执行第"+i+"个修改用户异常操作...");TaskManager.addTask(task);}for(int i=0;i<requestCount;i++){WorkTask task=new WorkTaskImp("执行第"+i+"个删除用户操作...");TaskManager.addTask(task);}//关闭线程池try {Thread.sleep(2000);//为了显示处理请求效果pool.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}package thread;import java.util.ArrayList;import java.util.List;import java.util.Vector;import event.BeginTaskEvent;import event.EndTaskEvent;import task.TaskManager;/** * 线程池类,功能如下: * 1、初始化线程池 * 2、获取空闲线程 * 3、任务运行,注册超时监测 * 4、任务结束,注销超时监测 * 5、关闭线程池 */public class ThreadPool {private int threadcount;private int GetIdleThreadPollTime=50;//获取空闲线程轮询间隔时间,可配置private static ThreadPool pool=new ThreadPool();//线程实例private Vector<WorkThread> threadlist=new Vector<WorkThread>();//工作线程列表private TaskMonitorThread mainThread;//任务监测线程private TaskTimeOutThread timeThread; //任务超时线程private boolean StopGetIdleThread=false; //单例模式private ThreadPool(){}public static synchronized ThreadPool getInstance(){return pool;}private void stopGetIdleThread(){StopGetIdleThread = true;}//初始化线程池public void init(int count){System.out.println("开始初始化线程池...");threadcount=count;for(int i=0;i<count;i++){WorkThread t=new WorkThread(new Integer(i));threadlist.add(t);t.start();}mainThread=new TaskMonitorThread(pool);mainThread.start();timeThread=new TaskTimeOutThread(pool);timeThread.start();System.out.println("结束初始化线程池...");}//获取空闲线程public WorkThread getIdleThread(){while(true){if (StopGetIdleThread) return null;synchronized(threadlist){for(int i=0;i<threadlist.size();i++){WorkThread t=(WorkThread)threadlist.get(i);if (t.getMyState().equals(WorkThread.IDlESTATE)){return t;}}}try {Thread.sleep(GetIdleThreadPollTime);//放弃CPU,若干时间后重新获取空闲线程} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}//任务运行,注册监测public void beginTaskRun(BeginTaskEvent begin){timeThread.beginTaskRun(begin);}//任务结束,注销监视public void endTaskRun(EndTaskEvent end){timeThread.endTaskRun(end);}//从工作线程表中移除线程public void removeWorkThread(WorkThread t){threadlist.remove(t);}//添加新的线程public void addWorkThread(){synchronized(threadlist){WorkThread t=new WorkThread(new Integer(++threadcount));threadlist.add(t);t.start();}}//关闭线程池public void close(){//停止获取空闲线程stopGetIdleThread();//关闭任务监测线程,不再接收请求mainThread.kill();//关闭超时监测线程timeThread.kill();//关闭工作线程,不再处理任务for(int i=0;i<threadlist.size();i++){WorkThread t=(WorkThread)threadlist.get(i);t.kill();}}}package thread;import task.WorkTask;import event.BeginTaskEvent;import event.EndTaskEvent;/** * 工作线程类,功能如下: * 1、执行业务方法,业务参数可动态设置 * 2、自身状态可设置、可获取 * 3、自我唤醒功能 * 4、自杀功能 */public final class WorkThread extends Thread{private boolean shutdown=false;private String info; //业务参数private Object threadKey;//线程标识private Object lock=new Object();//锁对象private String state; //线程状态private int waitExecFinishPollTime=500;//关闭线程时的轮询等待时间,可配置public static final String CREATESTATE="1";//创建状态public static final String RUNSTATE="2"; //运行状态public static final String IDlESTATE="3"; //空闲状态 private WorkTask nowTask; //当前任务//获取线程标识keypublic Object getThreadKey() {return threadKey;} //设置线程的任务 public void setWorkTask(WorkTask task){ this.nowTask=task; }//设置是否关闭线程private void setShutdown(boolean shutdown) {this.shutdown = shutdown;}//设置线程状态private void setMyState(String state){this.state=state;}//获取线程状态public String getMyState(){return state;}public WorkThread(Object key){System.out.println("正在创建工作线程...线程编号"+key.toString());this.threadKey=key;this.state=CREATESTATE;}@Overridepublic synchronized void start() {// TODO Auto-generated method stubsuper.start();setMyState(RUNSTATE);}public void run(){while(true){try {setMyState(IDlESTATE);synchronized(this){wait(); /*开始等待,直至被激活*/}} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}if (shutdown) break;try{new BeginTaskEvent(this,Thread.currentThread(),nowTask).execute();nowTask.execute();//执行业务new EndTaskEvent(this,Thread.currentThread(),nowTask).execute();}catch(Exception e){new EndTaskEvent(this,Thread.currentThread(),nowTask).execute();System.out.println(e.getMessage());}}}//重新激活线程public void activate(){synchronized(this){setMyState(RUNSTATE);notify();}}//关闭线程public void kill(){synchronized(this){ if (this.getMyState().equals(IDlESTATE)){//如果线程处于空闲状态,则直接关掉System.out.println("正在关闭工作线程...线程编号"+threadKey.toString()); this.setShutdown(true); this.activate(); }else if (this.getMyState().equals(RUNSTATE)){//如果线程处于运行状态,则执行完后再关掉System.out.println("正在等待线程执行业务完成...工作线程编号"+threadKey.toString()); while(this.getMyState().equals(RUNSTATE)){ try {Thread.sleep(waitExecFinishPollTime);//放弃CPU,若干时间后再检查线程状态} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }System.out.println("正在关闭工作线程...线程编号"+threadKey.toString()); this.setShutdown(true); this.activate(); }}}}package thread;import task.TaskManager;import task.WorkTask;/** * 任务检测线程类 * 1、自杀功能 */public final class TaskMonitorThread extends Thread {private ThreadPool threadPool;private int GetWorkTaskPollTime=10;//监测任务轮询时间,可配置private boolean shutdown=false; public TaskMonitorThread(ThreadPool pool){System.out.println("正在创建任务监测线程...");this.threadPool=pool;}private void setShutDown(boolean b){this.shutdown=b;}@Overridepublic void run() {// TODO Auto-generated method stubwhile(true){if (shutdown) break;WorkTask task=TaskManager.getWorkTask();//看是否有任务请求if (task==null){try {Thread.sleep(GetWorkTaskPollTime);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}else{WorkThread t=threadPool.getIdleThread();//获取空闲线程if (t==null) break;t.setWorkTask(task);//设置线程任务task.setTaskThreadKey(t.getThreadKey());//为了显示任务当前线程t.activate();//激活空闲线程try {Thread.sleep(GetWorkTaskPollTime); } catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}//关闭线程public void kill(){System.out.println("正在关闭任务监测线程...");this.setShutDown(true);}}package thread;import java.util.Vector;import event.BeginTaskEvent;import event.EndTaskEvent;import event.TaskRunTime;import event.TaskTimeOutEvent;/** * 任务超时监测线程类 * 1、任务开始注册 * 2、任务完成注销 * 3、自杀功能 */public class TaskTimeOutThread extends Thread {private ThreadPool pool;private boolean shutdown=false;private Vector<TaskRunTime> taskruntimelist=new Vector<TaskRunTime>();//运行任务列表private int pollTime=500; //轮询时间private int TaskOutTime=2000; //任务过时时间public TaskTimeOutThread(ThreadPool pool){this.pool=pool;}@Overridepublic void run() {// TODO Auto-generated method stubwhile(!shutdown){synchronized(taskruntimelist){for(int i=0;i<taskruntimelist.size();i++){TaskRunTime t=(TaskRunTime) taskruntimelist.get(i);if (t.checkRunTimeOut(TaskOutTime)){taskruntimelist.remove(i);new TaskTimeOutEvent(t.getEvent()).execute();break;}}}try {sleep(pollTime);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}//任务运行,开始监测public void beginTaskRun(BeginTaskEvent begin){taskruntimelist.add(new TaskRunTime(begin));}//任务正常结束public void endTaskRun(EndTaskEvent end){synchronized(taskruntimelist){for(int i=0;i<taskruntimelist.size();i++){BeginTaskEvent begin=((TaskRunTime) taskruntimelist.get(i)).getEvent();if (begin.equals(end)){taskruntimelist.remove(i);break;}}}}//自杀public void kill(){System.out.println("正在关闭超时监测线程...");while(taskruntimelist.size()>0){try {Thread.sleep(pollTime);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}shutdown=true;}}package task;import java.util.ArrayList;import java.util.List;/** * 任务管理器 * 1、添加任务 * 2、监测是否有新任务 */public class TaskManager {private static List taskQueue=new ArrayList<WorkTask>(); //任务队列 private TaskManager(){}//添加任务 public synchronized static void addTask(WorkTask task){taskQueue.add(task);}//判断是否有任务未执行public synchronized static WorkTask getWorkTask(){if (taskQueue.size()>0){return (WorkTask)taskQueue.remove(0);}elsereturn null;}}package task;/** * 任务接口 * 继承它来定义自己具体的工作任务 */public interface WorkTask {void execute() throws Exception; //执行工作任务void setTaskThreadKey(Object key);//设置任务线程编号}package task;/** * 任务类1 * 正常执行的工作任务 */public class WorkTaskImp implements WorkTask {protected String param;protected Object threadkey; //为了显示执行线程编号protected final int TaskExecTime=500; //任务执行时间public void execute() throws Exception {// TODO Auto-generated method stubSystem.out.println(param+"工作线程编号"+threadkey.toString()); Thread.sleep(TaskExecTime);}public WorkTaskImp(String param){this.param=param;}public void setTaskThreadKey(Object key){this.threadkey=key;}public String toString(){return param+"工作线程编号"+threadkey.toString();}}package task;/** * 任务类2 * 执行报异常的工作任务 */public class WorkTaskAImp extends WorkTaskImp{public WorkTaskAImp(String param) {super(param);// TODO Auto-generated constructor stub}public void execute() throws Exception {// TODO Auto-generated method stubthrow new Exception("运行WorkTaskAImp任务时出错");}}package task;/* * 任务类3 * 执行超时的工作任务 */public class WorkTaskBImp extends WorkTaskImp{public WorkTaskBImp(String param) {super(param);// TODO Auto-generated constructor stub}public void execute() throws Exception {// TODO Auto-generated method stubSystem.out.println("正在"+param); Thread.sleep(50000); //随便定义}}package event;import task.WorkTask;import thread.WorkThread;/* *任务抽象事件 */public abstract class AbstractEvent {protected WorkThread workthread;protected Thread nowthread;protected WorkTask nowtask;//事件触发public synchronized void execute(){};@Overridepublic boolean equals(Object obj) {// TODO Auto-generated method stubAbstractEvent other=(AbstractEvent)obj;return this.workthread==other.workthread&&this.nowtask==this.nowtask;};}package event;import task.WorkTask;import thread.ThreadPool;import thread.WorkThread;/* * 任务开始运行事件 */public class BeginTaskEvent extends AbstractEvent{public BeginTaskEvent(WorkThread workthread,Thread nowthread,WorkTask task){this.workthread=workthread;this.nowthread=nowthread;this.nowtask=task;}@Overridepublic void execute() {// TODO Auto-generated method stubThreadPool pool=ThreadPool.getInstance();pool.beginTaskRun(this);}}package event;import task.WorkTask;import thread.ThreadPool;import thread.WorkThread;/* * 任务运行结束事件 */public class EndTaskEvent extends AbstractEvent {public EndTaskEvent(WorkThread workthread,Thread nowthread,WorkTask task){this.workthread=workthread;this.nowthread=nowthread;this.nowtask=task;}@Overridepublic void execute() {// TODO Auto-generated method stubThreadPool pool=ThreadPool.getInstance();pool.endTaskRun(this);}}package event;/* * 任务运行时间类 */public class TaskRunTime {private long begintime;private long endtime;private BeginTaskEvent event;public TaskRunTime(BeginTaskEvent event){this.event=event;this.begintime=System.currentTimeMillis();this.endtime=this.begintime;}public BeginTaskEvent getEvent() {return event;}//检查是否超时public boolean checkRunTimeOut(long maxtime){endtime=System.currentTimeMillis();long cha=endtime-begintime;return cha>=maxtime;}}package event;import task.WorkTask;import thread.ThreadPool;import thread.WorkThread;/* * 任务超时事件 */public class TaskTimeOutEvent {private AbstractEvent event;public TaskTimeOutEvent(AbstractEvent event){this.event=event;}@SuppressWarnings("deprecation")public void execute() {// TODO Auto-generated method stubThreadPool pool=ThreadPool.getInstance();pool.addWorkThread();pool.removeWorkThread(event.workthread);Object obj=event.workthread.getThreadKey();System.out.println("正在停止工作超时线程...线程编号"+obj);event.nowthread.stop();}}