首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

行列阻塞浅析

2012-09-08 
队列阻塞浅析? 这几天所做的项目中涉及到了队列阻塞机制,通过研究整理如下。在这里和大家分享。? ? ? ?队列

队列阻塞浅析

? 这几天所做的项目中涉及到了队列阻塞机制,通过研究整理如下。在这里和大家分享。

? ? ? ?队列以一种先进先出的方式。如果你向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

  下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。?  java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。?  生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。?  我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。? 在这个程序中,我们使用队列数据结构作为一种同步机制。
    import?java.io.*;??import?java.util.*;??import?java.util.concurrent.*;????public?class?BlockingQueueTest??{?????public?static?void?main(String[]?args)?????{????????Scanner?in?=?new?Scanner(System.in);????????System.out.print("Enter?base?directory?(e.g.?/usr/local/jdk1.6.0/src):?");????????String?directory?=?in.nextLine();????????System.out.print("Enter?keyword?(e.g.?volatile):?");????????String?keyword?=?in.nextLine();??????????final?int?FILE_QUEUE_SIZE?=?10;????????final?int?SEARCH_THREADS?=?100;??????????BlockingQueue<File>?queue?=?new?ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);??????????FileEnumerationTask?enumerator?=?new?FileEnumerationTask(queue,?new?File(directory));????????new?Thread(enumerator).start();????????for?(int?i?=?1;?i?<=?SEARCH_THREADS;?i++)???????????new?Thread(new?SearchTask(queue,?keyword)).start();?????}??}????/**??*?This?task?enumerates?all?files?in?a?directory?and?its?subdirectories.??*/??class?FileEnumerationTask?implements?Runnable??{?????/**?????*?Constructs?a?FileEnumerationTask.?????*?@param?queue?the?blocking?queue?to?which?the?enumerated?files?are?added?????*?@param?startingDirectory?the?directory?in?which?to?start?the?enumeration?????*/?????public?FileEnumerationTask(BlockingQueue<File>?queue,?File?startingDirectory)?????{????????this.queue?=?queue;????????this.startingDirectory?=?startingDirectory;?????}???????public?void?run()?????{????????try????????{???????????enumerate(startingDirectory);???????????queue.put(DUMMY);????????}????????catch?(InterruptedException?e)????????{????????}?????}???????/**?????*?Recursively?enumerates?all?files?in?a?given?directory?and?its?subdirectories?????*?@param?directory?the?directory?in?which?to?start?????*/?????public?void?enumerate(File?directory)?throws?InterruptedException?????{????????File[]?files?=?directory.listFiles();????????for?(File?file?:?files)????????{???????????if?(file.isDirectory())?enumerate(file);???????????else?queue.put(file);????????}?????}???????public?static?File?DUMMY?=?new?File("");???????private?BlockingQueue<File>?queue;?????private?File?startingDirectory;??}????/**??*?This?task?searches?files?for?a?given?keyword.??*/??class?SearchTask?implements?Runnable??{?????/**?????*?Constructs?a?SearchTask.?????*?@param?queue?the?queue?from?which?to?take?files?????*?@param?keyword?the?keyword?to?look?for?????*/?????public?SearchTask(BlockingQueue<File>?queue,?String?keyword)?????{????????this.queue?=?queue;????????this.keyword?=?keyword;?????}???????public?void?run()?????{????????try????????{???????????boolean?done?=?false;???????????while?(!done)???????????{??????????????File?file?=?queue.take();??????????????if?(file?==?FileEnumerationTask.DUMMY)??????????????{?????????????????queue.put(file);?????????????????done?=?true;??????????????}??????????????else?search(file);???????????????????????}????????}????????catch?(IOException?e)????????{???????????e.printStackTrace();????????}????????catch?(InterruptedException?e)????????{????????}???????????}???????/**?????*?Searches?a?file?for?a?given?keyword?and?prints?all?matching?lines.?????*?@param?file?the?file?to?search?????*/?????public?void?search(File?file)?throws?IOException?????{????????Scanner?in?=?new?Scanner(new?FileInputStream(file));????????int?lineNumber?=?0;????????while?(in.hasNextLine())????????{???????????lineNumber++;???????????String?line?=?in.nextLine().trim();???????????if?(line.contains(keyword))?System.out.printf("%s:%d????%s%n",?file.getPath(),?lineNumber,?line);????????}????????in.close();?????}???????private?BlockingQueue<File>?queue;?????private?String?keyword;??}?

热点排行