LinkedBlockingQueue 例子
import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicLong;import org.apache.commons.lang.RandomStringUtils;/** * @author zhaoqilong * @version 创建时间:2012-6-7 上午9:16:56 * */public class Test { private static LinkedBlockingQueue<String> queue =new LinkedBlockingQueue<String>(); // 线程控制开关 private final CountDownLatch latch = new CountDownLatch(1); //的线程池 private final ExecutorService pool; //AtomicLong 计数 生产数量 private final AtomicLong output = new AtomicLong(0); //AtomicLong 计数 销售数量 private final AtomicLong sales = new AtomicLong(0); //是否停止线程 private final boolean clear; public Test(boolean clear){ this.pool = Executors.newCachedThreadPool(); this.clear=clear; } public void service() throws InterruptedException{ Saler a=new Saler(queue, sales, latch, clear); pool.submit(a); Worker w=new Worker(queue, output, latch); pool.submit(w); latch.countDown(); } public static void main(String[] args) { Test t=new Test(false); try {t.service();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} } class Saler implements Runnable{ private final LinkedBlockingQueue<String> queue; private final AtomicLong sales; private final CountDownLatch latch; private final boolean clear; public Saler(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear){ this.queue = queue; this.sales = sales; this.latch = latch; this.clear = clear; }public void run() {try { latch.await(); // 放闸之前老实的等待着 for (;;) { sale(); Thread.sleep(500); } }catch (InterruptedException e) { if(clear) { // 响应中断请求后,如果有要求则销售完队列的产品后再终止线程 cleanWarehouse(); } else { System.out.println("Seller Thread will be interrupted..."); } } }public void sale(){System.out.println("==取take=");try {String item = queue.poll(50, TimeUnit.MILLISECONDS);System.out.println(item);if(item!=null){sales.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数}} catch (InterruptedException e) {e.printStackTrace();} } /** * 销售完队列剩余的产品 */ private void cleanWarehouse() { try { while (queue.size() > 0) { sale(); } } catch (Exception ex) { System.out.println("Seller Thread will be interrupted..."); } } } /** * 生产者 * @author Administrator * */ class Worker implements Runnable{ private LinkedBlockingQueue<String> queue; private CountDownLatch latch; private AtomicLong output; public Worker(){ } public Worker(LinkedBlockingQueue<String> queue, AtomicLong output,CountDownLatch latch){ this.queue=queue; this.latch=latch; this.output=output; } public void run() { try { latch.await(); // 线程等待 for (;;) { work(); Thread.sleep(100); } }catch (InterruptedException e) { System.out.println("Worker thread will be interrupted..."); } } /** * 工作 */ public void work(){ try { String product=RandomStringUtils.randomAscii(3); boolean success=queue.offer(product, 100, TimeUnit.MILLISECONDS); if(success){ output.incrementAndGet();// 可以声明long型的参数获得返回值,作为日志的参数 } } catch (InterruptedException e) { e.printStackTrace(); } } }}