首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

(9) 执行器

2012-10-24 
(九) 执行器线程池:1.使用线程池(Thread Pool)的前提是,程序中创建了大量的生命周期很短的线程。构建一个新

(九) 执行器

线程池:
1.使用线程池(Thread Pool)的前提是,程序中创建了大量的生命周期很短的线程。构建一个新的线程是有一定代价的,因为涉及与操作系统的交互。
2.另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。所以,当有一个会创建许多线程的算法,应该使用一个线程数“固定的”线程池以限制并发总数。
3.线程池中包含许多准备运行的空闲线程,将Runnable对象交给线程池,就会有一个线程调用run方法。当run方法退出时,线程不会死亡,而是在池中准备为下一个请求服务。

执行器(Executor)类中有许多静态工厂方法用来创建线程池
??? ??? 方法??? ??? ??? ??? ??? ??? ??? ??? ??? ??? 描述
newCachedThreadPool??? ??? ??? ??? ??? 必要时创建新线程;空闲线程会被保留60秒。
newFixedThreadPool??? ??? ??? ??? ??? 该池包含固定数量的线程;空闲线程会一直被保留。
newSingleThreadPool??? ??? ??? ??? ??? 只有一个线程的“池”,该线程顺序执行每一个提交的任务
newScheduledThreadPool??? ??? ??? ??? 用于预定执行而构建的固定线程池,替代java.util.Timer
newSingleThreadScheduledExecutor??? 用于预定执行而构建的单线程“池”

1.线程池
(1)newCachedThreadPool方法构建了一个线程池,对于每个任务,如果有空闲线程可用,立即让它执行任务,如果没有可用的空闲线程,则创建一个新线程。
(2)newFixedThreadPool方法构建一个具有固定大小的线程池。如果提交的任务数多余空闲的线程数,那么把得不到服务的任务放置到队列中,当其他任务完成后在运行它。
(3)newSingleThreadPool方式是一个退化了的大小为1的线程池。由一个线程执行提交的任务,一个接着一个。
以上三个方法返回实现了ExecutorService接口的ThreadPoolExecutor类的对象。

3种将Runnable对象或Callable对象提交给ExecutorService的方法
(1)Future<?> submit(Runnable task)
该方法返回Future<?>,该对象可以调用isDone,cancel或isCancelled方法,但是当get方法完成时,只是简单的返回null。
(2)Future<T> submit(Runnable task, T result)
该方法提交Runnable对象,并且Future的get方法完成时返回指定的result对象。
(3)Future<T> submit(Callable<T> task)
该方法提交Callable对象,并且Future的get方法将在计算结果准备好的时候得到结果。

shutdown : 当用完一个线程池的时候,需要调用shutdown方法。该方法启动该池的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成之后,线程池中的线程死亡。
shutdownNow : 该池取消尚未开始的所有任务并试图中断正在运行的线程。

连接线程池的步骤:
(1)调用Executors类中的静态方法newCachedThreadPool或newFixedThreadPool。
(2)调用submit提交Runnable或Callable对象。
(3)如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象。
(4)当不再提交任何任务时,调用shutdown。

?

import java.io.File;import java.util.Scanner;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;public class FutureTest {public static void main(String[] args) {String dirctory = "";String keyword = null;File dir = new File(dirctory);Scanner in = new Scanner(System.in);while(!dir.exists() || !dir.isDirectory()){System.out.println("Enter base directory (e.g. D:\\WorkSpace\\Demo):");dirctory = in.nextLine();dir = new File(dirctory);}while(keyword==null || "".equals(keyword)){System.out.println("Enter keyword(e.g. python):");keyword = in.nextLine();}System.out.println("The directory: "+dir.getPath()+", The keyword: "+keyword);//1.使用FutureTask方式//MatchCounter counter = new MatchCounter(new File(dirctory), keyword);//FutureTask<Integer> task = new FutureTask<Integer>(counter);//Thread t = new Thread(task);//t.start();//2.使用线程池ExecutorService pool = Executors.newCachedThreadPool();MatchCounter counter = new MatchCounter(dir, keyword, pool);Future<Integer> result = pool.submit(counter);try{System.out.println(result.get() + " matching files.");}catch(Exception e){e.printStackTrace();}//2.使用线程池应关闭线程池pool.shutdown();int largestPoolSize = ((ThreadPoolExecutor)pool).getLargestPoolSize();System.out.println("largest pool size: " + largestPoolSize);}}

?

public class MatchCounter implements Callable<Integer>{private ExecutorService pool;private File directory;private String keyword;private int count;public MatchCounter(File directory, String keyword){this.directory = directory;this.keyword = keyword;count = 0;}public MatchCounter(File directory, String keyword, ExecutorService pool){this.directory = directory;this.keyword = keyword;this.pool = pool;count = 0;}@Overridepublic Integer call() throws Exception {try{File[] files = directory.listFiles();List<Future<Integer>> results = new ArrayList<Future<Integer>>();for(File file:files){if(file.isDirectory()){//1.通过FutureTask//MatchCounter counter = new MatchCounter(file, keyword);//FutureTask task = new FutureTask(counter);//results.add(task);//Thread t = new Thread(task);//t.start();//2.通过线程池MatchCounter counter = new MatchCounter(file, keyword, pool);results.add(pool.submit(counter));}else{if(search(file)){count++;}}}for(Future<Integer> result:results){try{count += result.get();}catch(Exception e){e.printStackTrace();}}}catch(Exception e){e.printStackTrace();}return count;}private boolean search(File file) {try{Scanner in = new Scanner(new FileInputStream(file));boolean found = false;while(!found && in.hasNextLine()){String line = in.nextLine();if(line.contains(keyword)){found = true;}}in.close();return found;}catch(Exception e){return false;}}}

?

?

2.预订执行
ScheduledExecutorService接口具有为预定执行(Scheduled Execution)或重复执行任务而设计的方法。它是一种允许使用线程池机制的java.util.Timer的泛化。
Executor类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法实现了ScheduledExecutorService接口的对象。
可以预订Runnable或Callable在初始的延迟之后之后只运行一次。也可以预定一个Runnable对象周期性的运行。

3.控制任务组
使用执行器有更有实际意义的原因
(1)可以在执行器中使用shutdownNow方法取消所有的任务。
(2)invokeAny方法提交所有对象到一个Callable对象的集合中,并返回某个已经完成了的任务的结果。
(3)invokeAll方法提交所有的对象到一个Callable对象的集合中,并返回一个Future对象的列表,代表所有任务的解决方案。
e.g.??? List<Callable<T>> task = ...;
??? ??? List<Future<T>> results = executor.invokeAll(tasks);
??? ??? for(Future<T> result : results){
??? ??? ??? processFuture(result.get());
??? ??? }
这个方法的缺点是,如果第一个任务占用了很多时间执行,则可能不得不等待。可以用ExecutorCompletionService来进行排列。
(4)ExecutorCompletionService的构建方法
e.g.??? ExecutorCompletionService service = new ExecutorCompletionService(executor);
??? ??? for(Callable<T> task:tasks){
??? ??? ??? service.submit(task);
??? ??? }
??? ??? for(int i=0;i<tasks.size();i++){
??? ??? ??? processFuture(service.task().get());
??? ??? }

热点排行