首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

代码兑现生产者消费者

2013-03-19 
代码实现生产者消费者生产者线程?public class ReadThread implements Runnable{??? BlockingQueueString

代码实现生产者消费者

生产者线程

?

public class ReadThread implements Runnable
{
??? BlockingQueue<String> blocking;
??? ReadThread(BlockingQueue<String> blocking)
??? {
??????? this.blocking = blocking;
??? }
??? @Override
??? public void run()
??? {
??????? read();
??? }
??? public void read()
??? {
??????? try
??????? {
??????????? BufferedReader br = new BufferedReader(new InputStreamReader(
??????????????????? new FileInputStream(new File("D://1.txt"))));
??????????? String line = "";
??????????? while ((line = br.readLine()) != null)
??????????? {
??????????????? System.out.println("向队列中添加:"+line);
??????????????? while (!blocking.offer(line))
??????????????? {
??????????????????? System.out.println("队列已满 进行等待");
??????????????????? Thread.sleep(100);
??????????????? }
??????????? }
??????????? ParseFileThread.idDone = true;
??????? }
??????? catch (Exception e)
??????? {
??????????? e.printStackTrace();
??????? }
??? }
}

?

消费者线程

public class ParseThread implements Runnable
{
??? BlockingQueue<String> blocking;
???
??? ParseThread(BlockingQueue<String> blocking)
??? {
??????? this.blocking = blocking;
??? }
???
??? @Override
??? public void run()
??? {
??????? Parse();
??? }
???
??? public void Parse()
??? {
??????? ExecutorService pool = Executors.newFixedThreadPool(3);
??????? try
??????? {
??????????? for (int i = 0; i < 3; i++)
??????????? {
??????????????? pool.execute(new Runnable()
??????????????? {
??????????????????? @Override
??????????????????? public void run()
??????????????????? {
??????????????????????? while (true)
??????????????????????? {
??????????????????????????? String line = blocking.poll();
??????????????????????????? if(ParseFileThread.idDone && line==null)
??????????????????????????? {
??????????????????????????????? //已经处理完
??????????????????????????????? System.out.println("任务已经出来完~ 进行退出");
??????????????????????????????? return;
??????????????????????????? }
??????????????????????????? else if(!ParseFileThread.idDone && line==null)
??????????????????????????? {
??????????????????????????????? System.out.println("队列已空!进行等待");
??????????????????????????????? try
??????????????????????????????? {
??????????????????????????????????? Thread.sleep(100);
??????????????????????????????? }
??????????????????????????????? catch (InterruptedException e)
??????????????????????????????? {
??????????????????????????????????? e.printStackTrace();
??????????????????????????????? }
??????????????????????????? }else if(!ParseFileThread.idDone && line!=null)
??????????????????????????? {
?????????????????????????????? System.out.println("从队列中获取:"+line);
??????????????????????????? }
??????????????????????? }
???????????????????????
??????????????????? }
??????????????? });
??????????? }
???????????
??????? }
??????? catch (Exception e)
??????? {
??????????? e.printStackTrace();
??????? }
??????? pool.shutdown();
??? }
???
}

?

?

主函数

?

public class ParseFileThread
{
??? private static BlockingQueue<String> blocking = new LinkedBlockingQueue<String>(3);
???
??? public static Boolean idDone = false;
???
??? public static void main(String[] args)
??? {
??????? ExecutorService executor = Executors.newFixedThreadPool(2);
???????
??????? executor.execute(new ReadThread(blocking));
??????? executor.execute(new ParseThread(blocking));
???????
??????? executor.shutdown();

??? }
}

热点排行