Java 生产者消费者实现方式
1,使用 同步队列 解决任务协调
2,使用 常规Lock,Condition解析任务协调
3,使用synchronized加锁机制
import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class ProducerAndConsumer {public static void main(String[] args) throws Exception {CoinBox box = new CoinBox();ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new Consumer(box));exec.execute(new Producer(box));TimeUnit.MILLISECONDS.sleep(1);exec.shutdownNow();}}class Coin {private final int id;public Coin(int idn) {id = idn;}@Overridepublic String toString() {return "Coin [id=" + id + "]";}}class CoinBox {LinkedBlockingQueue<Coin> box = new LinkedBlockingQueue<Coin>();public void producess(Coin coin){box.add(coin);System.out.println("投入硬币:" + coin);}public void consumer(){try {//Coin coin = box.take();Coin coin = box.poll();if(coin == null){System.err.println("拿走硬币:" + coin);}else{System.out.println("拿走硬币:" + coin);}} catch (Exception e) {System.err.println("consumer exception");}}}class Consumer implements Runnable {private CoinBox box;public Consumer(CoinBox box) {this.box = box;}@Overridepublic void run() {while(!Thread.interrupted()){box.consumer();}}}class Producer implements Runnable {private CoinBox box;private Random rand = new Random(47);public Producer(CoinBox box) {this.box = box;}@Overridepublic void run() {while(!Thread.interrupted()){Coin coin = new Coin(rand.nextInt(500));box.producess(coin);}}}import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ProducerConsumerImpl {public static void main(String[] args) {Store store = new Store();ExecutorService proConServiceExecutor = Executors.newFixedThreadPool(2);Producer1 producer = new Producer1(store);Consumer1 consumer = new Consumer1(store);try {proConServiceExecutor.execute(producer);proConServiceExecutor.execute(consumer);} catch (Exception e) {e.printStackTrace();} finally {proConServiceExecutor.shutdown();}}}class Store {private Lock producerConsumberLock;private Condition producerCondition;private Condition consumerCondition;private final int POOL_SIZE = 2;private int productNumber;private int[] productPool;public Store() {productPool = new int[POOL_SIZE];productNumber = 0;producerConsumberLock = new ReentrantLock();producerCondition = producerConsumberLock.newCondition();consumerCondition = producerConsumberLock.newCondition();}public void produce() {try {while (productNumber >= POOL_SIZE) {System.out.println("productNumber=" + productNumber);System.out.println("Pool is full, producer " + Thread.currentThread().getName() + " wait...");producerCondition.await();}producerConsumberLock.lock();productPool[productNumber++] = 1;System.out.println(Thread.currentThread().getName() + " Porduced a product....");if (productNumber > 0) {consumerCondition.signalAll();}} catch (Exception e) {e.printStackTrace();} finally {producerConsumberLock.unlock();}}public void consume() {try {while (productNumber <= 0) {System.out.println("productNumber=" + productNumber);System.out.println("Pool is empty, consumer " + Thread.currentThread().getName() + " wait...");consumerCondition.await();}producerConsumberLock.lock();productNumber--;System.out.println(Thread.currentThread().getName() + " Consumed a product....");if (productNumber <= POOL_SIZE - 1 && productNumber >= 0) {producerCondition.signalAll();}} catch (Exception e) {e.printStackTrace();} finally {producerConsumberLock.unlock();}}}class Consumer1 implements Runnable {private Store pcl;public Consumer1(Store pcl) {this.pcl = pcl;}@Overridepublic void run() {while (true) {pcl.consume();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}}class Producer1 implements Runnable {private Store pcl;public Producer1(Store pcl) {this.pcl = pcl;}@Overridepublic void run() {while (true) {pcl.produce();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}public class PiggyMoney {public static void main(String[] args) throws InterruptedException {Bank bank = new Bank();ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new CoinIn(bank));exec.execute(new CoinOut(bank));TimeUnit.MILLISECONDS.sleep(3);List results = exec.shutdownNow();for (Object object : results) {System.out.println(object);}}}class Bank {private List<Money> bank = new LinkedList<Money>();public synchronized void coinIn() {try {Money money = new Money();bank.add(money);System.out.println("主淫存入" + money);if (bank.size() > 0) {notifyAll();}} catch (Exception e) {e.printStackTrace();}}public synchronized void coinOut() {try {if (bank.size() <= 0) {System.err.println("主淫,您不能透支啊....");wait();}Money money = bank.get(0);bank.remove(money);System.out.println("主淫取出" + money);} catch (InterruptedException e) {e.printStackTrace();}}}class CoinOut implements Runnable {private Bank bank;public CoinOut(Bank bank) {this.bank = bank;}@Overridepublic void run() {while (!Thread.interrupted()) {bank.coinOut();}}}class CoinIn implements Runnable {private Bank bank;public CoinIn(Bank bank) {this.bank = bank;}@Overridepublic void run() {while (!Thread.interrupted()) {bank.coinIn();}}}class Money {private int id = counter++;private static int counter = 1;@Overridepublic String toString() {return "Money [id=" + id + "]";}}