首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

ThreadPool的兑现

2012-10-23 
ThreadPool的实现最近看hadoop的时候,无意中看到了concurrent包中的类,于是打算好好研究一下线程安全方面

ThreadPool的实现

最近看hadoop的时候,无意中看到了concurrent包中的类,于是打算好好研究一下线程安全方面的东西。先自己实现一个线程池。然后和sun自带的threadpool比较一下,看自己的实现有什么问题。

?1. ThreadPool 类,用于存储工作线程,在构造函数中创建线程,并启动线程

package com.fnk.threadpool;import java.util.Vector;public class ThreadPool {private Vector<Thread> threads;private static final int DEFAULT_THREAD_SIZE = 5;ThreadPool(ThreadWorkQueue workQueue) throws InvalidThreadParamException{this(workQueue,DEFAULT_THREAD_SIZE);}ThreadPool(ThreadWorkQueue workQueue,int threadSize) throws InvalidThreadParamException{if(workQueue == null || threadSize <= 0){throw new InvalidThreadParamException();}this.threads = new Vector<Thread>();for(int i = 0 ; i < threadSize; i++){threads.add(new WorkThread(workQueue));threads.get(i).start();}}}

?

?2.任务队列类

package com.fnk.threadpool;import java.util.Vector;/* * 任务队列类,用于存储任务。先到的任务,先执行 */public class ThreadWorkQueue {private Vector<WorkIntf> works;public  Object mutex = new Object();   ThreadWorkQueue(){works = new Vector<WorkIntf>();}/* * 任务进队列,如果原来的队列中的任务数为0,唤醒任务线程 */public void enQueue(WorkIntf work){if(works.size() == 0){works.add(work);synchronized (mutex) {mutex.notifyAll();}}else{works.add(work);}}/* * 任务出队列,如果队列中的任务数为0,让线程等待 */public WorkIntf deQueue(){if(works.size() == 0){synchronized (mutex) {try {mutex.wait();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}return null;}else{return works.remove(0);}}}

?

?3.任务接口类

package com.fnk.threadpool;public interface WorkIntf {public boolean doWork();}

?4. 工作线程类

package com.fnk.threadpool;//任务线程类 public class WorkThread extends Thread {ThreadWorkQueue workQueue;WorkThread(ThreadWorkQueue workQueue) {this.workQueue = workQueue;}public void run() {while (true) {WorkIntf work = null;//获取任务,如果任务队列中,work = workQueue.deQueue();//如果 有任务 ,那么就工作 if (work != null) {work.doWork();}}}}

?? 5. 异常类

package com.fnk.threadpool;public class InvalidThreadParamException extends Exception {/** *  */private static final long serialVersionUID = 1L;public InvalidThreadParamException() {super();}public InvalidThreadParamException(String message) {super(message);}public InvalidThreadParamException(String message, Throwable cause) {super(message, cause);}public InvalidThreadParamException(Throwable cause) {super(cause);}}

? 6. 测试

package com.fnk.threadpool;public class TestThreadPool {public static void main(String[] args) {try {ThreadWorkQueue workQueue = new ThreadWorkQueue();ThreadPool tp = new ThreadPool(workQueue);for(int i = 0; i < 10 ; i++){workQueue.enQueue(new WorkImpl(i));}} catch (InvalidThreadParamException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}

?

?

?总结:缺点是不能动态的控制线程的个数,在线程池开启的时候就必须创建所以线程。

热点排行