首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

BlockingQueue兑现生产者消费者模式

2013-03-21 
BlockingQueue实现生产者消费者模式?package com.sunrise.mywork2.concurrent.queueimport java.io.File

BlockingQueue实现生产者消费者模式
?package com.sunrise.mywork2.concurrent.queue;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class IndexingService
{
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;

public IndexingService(File root,final FileFilter fileFilter){
?this.root = root;
?this.queue = new LinkedBlockingQueue<File>(CAPACITY);
?this.fileFilter = new FileFilter()
?{
? public boolean accept(File pathname)
? {
? ?return pathname.isDirectory()||fileFilter.accept(pathname);
? }
?};
}

private boolean alreadyIndexed(File f){
?return false;
}

class CrawlerThread extends Thread{
?public void run(){
? try{
? ?crawl(root);
? }catch (InterruptedException e) {

? }finally{
? ?while(true){
? ? try{
? ? ?queue.put(POISON);
? ? ?break;
? ? }catch (InterruptedException e1) {
? ? }
? ?}
? }
?}

?private void crawl(File root) throws InterruptedException{
? File[] entries = root.listFiles(fileFilter);
? if(entries!=null){
? ?for(File entry:entries){
? ? if(entry.isDirectory())
? ? ?crawl(entry);
? ? else if(!alreadyIndexed(entry))
? ? ?queue.put(entry);
? ?}
? }
?}
}

class IndexerThread extends Thread{
?public void run(){
? try{
? ?while(true){
? ? File file = queue.take();
? ? if(file == POISON)
? ? ?break;
? ? else
? ? ?indexFile(file);
? ?}
? }catch (InterruptedException e) {
? }
?}

?public void indexFile(File file){
? //TODO
? System.out.println(file.getName());
?}
}

public void start(){
?producer.start();
?consumer.start();
}

public void stop(){
?producer.interrupt();
}

public void awaitTermination() throws InterruptedException{
?consumer.join();
}


/**
?* test it
?* @param args
?*/
public static void main(String[] args)
{
?File root = new File("F:\");
?FileFilter fileFilter = new FileFilter()
?{

? public boolean accept(File f)
? {
? ?return f.getName().lastIndexOf(".java")>-1;
? }
?};
?IndexingService is = new IndexingService(root, fileFilter);
?is.start();

}
}

热点排行