首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

生产者消费者模型的演化

2012-10-28 
生产者消费者模型的演变想复习一下生产者和消费者通过Java代码如何实现,网上搜集了一个,《Thinking in Java

生产者消费者模型的演变

想复习一下生产者和消费者通过Java代码如何实现,网上搜集了一个,《Thinking in Java》上面有两个,实现各有侧重。与大家分享,也当自己学习。

?

介绍:

生产者、消费者简单说这个模型核心角色有3个,即生产者、消费者、产品(关键区)。

生产者和消费者对产品(关键区)的操作时要互斥,保证并发时的正确性。

?

?

代码实现:

网络上最常见也是最简单的实现,直接对关键区加锁。生产者、消费者公共使用主线程MultiThread的container。通过对container进行synchronized控制,使用wait,notify来控制进程间的交替。

这种实现方式实现需要注意的有以下几点:

1.对关键区进行synchronized控制。

2.只有线程获取了关键区的访问权,才可以通过关键区对象调用notify,notifyAll之类的方法。否则就会抛出IllegalMonitorStateException的异常。

3.wait,notify为Object的方法,在该模式下都是关键区对象,如container来调用相关方法,而非其他线程。

?

消费者代码

package ProductAndConsume; import java.util.List; public class Consume implements Runnable{ private List container = null; private int count; public Consume(List lst){ this.container = lst; } public void run() { while(true){ synchronized (container) { if(container.size()== 0){ try { container.wait();//容器为空,放弃锁,等待生产 } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } container.remove(0); container.notify(); System.out.println("我吃了"+(++count)+"个"); } } } } 

?生产者:

package ProductAndConsume; import java.util.List; public class Product implements Runnable { private List container = null; private int count; public Product(List lst) { this.container = lst; } public void run() { while (true) { synchronized (container) { if (container.size() > MultiThread.MAX) { //如果容器超过了最大值,就不要在生产了,等待消费 try { container.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } container.add(new Object()); container.notify(); System.out.println("我生产了"+(++count)+"个"); } } } } 

?主线程,包括公共资源。

package ProductAndConsume; import java.util.ArrayList; import java.util.List; public class MultiThread { private List container = new ArrayList(); public final static int MAX = 5; public static void main(String args[]){ MultiThread m = new MultiThread(); new Thread(new Consume(m.getContainer())).start();new Thread(new Consume(m.getContainer())).start();new Thread(new Product(m.getContainer())).start(); new Thread(new Consume(m.getContainer())).start(); new Thread(new Product(m.getContainer())).start(); } public List getContainer() { return container; } public void setContainer(List container) { this.container = container; }}

??

Thinking in Java上面关于消费者、生产者的实现与上面方式基本相同。唯一不同的是,线程直接对自身加锁,而非关键区。因为线程中保留了对关键区的一个引用。

这种设计方式很严谨,同时考虑了信号量丢失的问题。最后的打印结果也很值得分析。

Out of food, closingOrder up! Chef interruptedWaitPerson interrupted?

当食物数量满了之后,关闭线程池被关闭,interrupt所有线程。此时线程在调用sleep或wait方法,就会抛出InterruptedException异常,从而打印出上面的结果。

实现代码如下:

package TIJ4PAC;/** * 生产者,消费者完整的例子 *///: concurrency/Restaurant.java// The producer-consumer approach to task cooperation.import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;class Meal {  private final int orderNum;  public Meal(int orderNum) { this.orderNum = orderNum; }  public String toString() { return "Meal " + orderNum; }}class WaitPerson implements Runnable {  private Restaurant restaurant;  public WaitPerson(Restaurant r) { restaurant = r; }  public void run() {    try {      while(!Thread.interrupted()) {        synchronized(this) {          while(restaurant.meal == null)            wait(); // ... for the chef to produce a meal,防止其他服务员突然闯入,夺走订单        }        System.out.println("Waitperson got " + restaurant.meal);        synchronized(restaurant.chef) {          restaurant.meal = null;          restaurant.chef.notifyAll(); // Ready for another        }      }    } catch(InterruptedException e) {      System.out.println("WaitPerson interrupted");    }  }}class Chef implements Runnable {  private Restaurant restaurant;  private int count = 0;  public Chef(Restaurant r) { restaurant = r; }  public void run() {    try {      while(!Thread.interrupted()) {        synchronized(this) {          while(restaurant.meal != null)            wait(); // ... for the meal to be taken,防止其他初始突然闯入,夺走机会        }        if(++count == 10) {          System.out.println("Out of food, closing");          restaurant.exec.shutdownNow();        }        System.out.println("Order up! ");        synchronized(restaurant.waitPerson) {          restaurant.meal = new Meal(count);          restaurant.waitPerson.notifyAll();        }        TimeUnit.MILLISECONDS.sleep(100);      }    } catch(InterruptedException e) {      System.out.println("Chef interrupted");    }  }}public class Restaurant {  Meal meal;  ExecutorService exec = Executors.newCachedThreadPool();  WaitPerson waitPerson = new WaitPerson(this);  Chef chef = new Chef(this);  public Restaurant() {    exec.execute(chef);    exec.execute(waitPerson);  }  public static void main(String[] args) {    new Restaurant();  }} /* Output:Order up! Waitperson got Meal 1Order up! Waitperson got Meal 2Order up! Waitperson got Meal 3Order up! Waitperson got Meal 4Order up! Waitperson got Meal 5Order up! Waitperson got Meal 6Order up! Waitperson got Meal 7Order up! Waitperson got Meal 8Order up! Waitperson got Meal 9Out of food, closingOrder up! Chef interruptedWaitPerson interrupted*///:~

?

最后一种实现,采用了BlockingQueue的方式,采用这种方式的好处就是,生产者、消费者无需再对产品(关键区)进行加锁控制了。BlockingQueue的put和take方法都会自动对关键区进行互斥的,无需编码者手动控制。代码结构简单许多。

下面这个例子挺有意思,通过线程模拟制作吐司面包,然后抹黄油,最后加果酱,吃吐司的过程。老外就是幽默呀~

代码如下:

package BlockingQueue;//: concurrency/ToastOMatic.java// A toaster that uses queues.import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;class Toast {public enum Status { DRY, BUTTERED, JAMMED }private Status status = Status.DRY;private final int id;public Toast(int idn) { id = idn; }public void butter() { status = Status.BUTTERED; }//抹黄油public void jam() { status = Status.JAMMED; }//涂果酱public Status getStatus() { return status; }public int getId() { return id; }public String toString() {return "Toast " + id + ": " + status;}}class ToastQueue extends LinkedBlockingQueue<Toast> {}class Toaster implements Runnable {//制作土司的线程private ToastQueue toastQueue;private int count = 0;private Random rand = new Random(47);public Toaster(ToastQueue tq) { toastQueue = tq; }public void run() {try {while(!Thread.interrupted()) {TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));// Make toastToast t = new Toast(count++);System.out.println(t);// Insert into queuetoastQueue.put(t);}} catch(InterruptedException e) {System.out.println("Toaster interrupted");}System.out.println("Toaster off");}}// Apply butter to toast:class Butterer implements Runnable {//抹黄油的人private ToastQueue dryQueue, butteredQueue;public Butterer(ToastQueue dry, ToastQueue buttered) {dryQueue = dry;butteredQueue = buttered;}public void run() {try {while(!Thread.interrupted()) {// Blocks until next piece of toast is available:Toast t = dryQueue.take();t.butter();System.out.println(t);butteredQueue.put(t);}} catch(InterruptedException e) {System.out.println("Butterer interrupted");}System.out.println("Butterer off");}}// Apply jam to buttered toast:class Jammer implements Runnable {//擦果酱的人private ToastQueue butteredQueue, finishedQueue;public Jammer(ToastQueue buttered, ToastQueue finished) {butteredQueue = buttered;finishedQueue = finished;}public void run() {try {while(!Thread.interrupted()) {// Blocks until next piece of toast is available:Toast t = butteredQueue.take();t.jam();System.out.println(t);finishedQueue.put(t);}} catch(InterruptedException e) {System.out.println("Jammer interrupted");}System.out.println("Jammer off");}}// Consume the toast:class Eater implements Runnable {//吃吐司的人private ToastQueue finishedQueue;private int counter = 0;public Eater(ToastQueue finished) {finishedQueue = finished;}public void run() {try {while(!Thread.interrupted()) {// Blocks until next piece of toast is available:Toast t = finishedQueue.take();// Verify that the toast is coming in order,// and that all pieces are getting jammed:if(t.getId() != counter++ ||t.getStatus() != Toast.Status.JAMMED) {System.out.println(">>>> Error: " + t);System.exit(1);} elseSystem.out.println("Chomp! " + t);}} catch(InterruptedException e) {System.out.println("Eater interrupted");}System.out.println("Eater off");}}public class ToastOMatic {public static void main(String[] args) throws Exception {ToastQueue dryQueue = new ToastQueue(),butteredQueue = new ToastQueue(),finishedQueue = new ToastQueue();//生成3个队列。分别是干吐司,抹了黄油的吐司,抹了果酱的吐司(完成的吐司)ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new Toaster(dryQueue));//制作exec.execute(new Butterer(dryQueue, butteredQueue));//抹黄油exec.execute(new Jammer(butteredQueue, finishedQueue));//抹果酱exec.execute(new Eater(finishedQueue));//吃TimeUnit.SECONDS.sleep(5);exec.shutdownNow();}} /* (Execute to see output) *///:~//使用BolckingQueue 简化明显,在使用显式的wait和notifyAll方法时存在的类和类直接的耦合被消除了,每一个类都和它的BlockinQueue通讯

?

就与大家分享到这里,代码如附件,如有错误欢迎指正。

热点排行