阻塞队列步步升华
1、首先我们定义一个简单的队列。这个队列可以不断的往队列中放数据,一个放数据的线程,一个取数据的线程。队列可以无限大,所以这样会有内存泄露的危险。
package com.zte;import java.util.Random;import java.util.Vector;public class SimpleQueue {Vector<Integer> vector = new Vector<Integer>();public synchronized void put(){System.out.println(Thread.currentThread().getName() + ",开始放数据。");vector.add(new Random().nextInt(1000));System.out.println(Thread.currentThread().getName() + ",已放完数据。");System.out.println("队列中还有:" + vector.size());this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的}public synchronized Integer get(){System.out.println(Thread.currentThread().getName() + ",开始取数据。");while(vector.size() == 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}Integer integer = vector.firstElement();vector.remove(integer);System.out.println(Thread.currentThread().getName() + ",已取完数据。");System.out.println("队列中还有:" + vector.size());return integer;}public static void main(String[] args){final SimpleQueue simpleQueue = new SimpleQueue();new Thread(new Runnable() {@Overridepublic void run() {try {while(true){simpleQueue.get();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {while(true){ simpleQueue.put();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}
package com.zte;import java.util.Random;public class MitThreadBufferArraySimpleQueue {Integer[] array = new Integer[20];int putPostion;int getPostion;int count;public synchronized void put(){if(count == array.length) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始放数据。");array[putPostion] = new Random().nextInt(1000);putPostion ++;count ++;if(putPostion == array.length) putPostion = 0;System.out.println(Thread.currentThread().getName() + ",已放完数据。");System.out.println("队列中还有:" + count +"个对象");this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的}public synchronized Integer get(){if(count == 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始取数据。"); Integer intObject = array[getPostion]; getPostion ++ ; count--; if(getPostion == array.length) getPostion = 0;System.out.println(Thread.currentThread().getName() + ",已取完数据。");System.out.println("队列中还有:" + count + "个对象");this.notify();return intObject;}public static void main(String[] args){final MitThreadBufferArraySimpleQueue bufferArraySimpleQueue= new MitThreadBufferArraySimpleQueue();for(int i=0 ; i<3;i++) {new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.get();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}for(int j=0 ; j<3;j++) {new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.put();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}}
package com.zte;import java.util.Random;public class MitThreadBufferArraySimpleQueue {Integer[] array = new Integer[20];int putPostion;int getPostion;int count;public synchronized void put(){if(count == array.length) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始放数据。");array[putPostion] = new Random().nextInt(1000);putPostion ++;count ++;if(putPostion == array.length) putPostion = 0;System.out.println(Thread.currentThread().getName() + ",已放完数据。");System.out.println("队列中还有:" + count +"个对象");this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的}public synchronized Integer get(){if(count == 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始取数据。"); Integer intObject = array[getPostion]; getPostion ++ ; count--; if(getPostion == array.length) getPostion = 0;System.out.println(Thread.currentThread().getName() + ",已取完数据。");System.out.println("队列中还有:" + count + "个对象");this.notify();return intObject;}public static void main(String[] args){final MitThreadBufferArraySimpleQueue bufferArraySimpleQueue= new MitThreadBufferArraySimpleQueue();for(int i=0 ; i<3;i++) {new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.get();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}for(int j=0 ; j<3;j++) {new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.put();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}}
package com.zte;import java.util.Random;public class ConditionMitThreadBufferArraySimpleQueue {Integer[] array = new Integer[20];int putPostion;int getPostion;int count;public synchronized void put(){if(count == array.length) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始放数据。");array[putPostion] = new Random().nextInt(1000);putPostion ++;count ++;if(putPostion == array.length) putPostion = 0;System.out.println(Thread.currentThread().getName() + ",已放完数据。");System.out.println("队列中还有:" + count +"个对象");this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的}public synchronized Integer get(){if(count == 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始取数据。"); Integer intObject = array[getPostion]; getPostion ++ ; count--; if(getPostion == array.length) getPostion = 0;System.out.println(Thread.currentThread().getName() + ",已取完数据。");System.out.println("队列中还有:" + count + "个对象");this.notify();return intObject;}public static void main(String[] args){final ConditionMitThreadBufferArraySimpleQueue bufferArraySimpleQueue= new ConditionMitThreadBufferArraySimpleQueue();for(int i=0 ; i<3;i++) {new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.get();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}for(int j=0 ; j<3;j++) {new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.put();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}}
package com.zte;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class BufferArraySimpleQueue {Integer[] array = new Integer[20];int putPostion;int getPostion;int count;Lock lock = new ReentrantLock();Condition readCondition = lock.newCondition();Condition writeCondition = lock.newCondition();public void put(){ lock.lock(); try{if(count == array.length) {try {writeCondition.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始放数据。");array[putPostion] = new Random().nextInt(1000);putPostion ++;count ++;if(putPostion == array.length) putPostion = 0;System.out.println(Thread.currentThread().getName() + ",已放完数据。");System.out.println("队列中还有:" + count +"个对象");readCondition.signal(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的 }finally { lock.unlock(); } }public Integer get(){lock.lock();try{if(count == 0) {try { readCondition.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + ",开始取数据。"); Integer intObject = array[getPostion]; getPostion ++ ; count--; if(getPostion == array.length) getPostion = 0;System.out.println(Thread.currentThread().getName() + ",已取完数据。");System.out.println("队列中还有:" + count + "个对象");writeCondition.signal();return intObject;}finally{lock.unlock();}}public static void main(String[] args){final BufferArraySimpleQueue bufferArraySimpleQueue= new BufferArraySimpleQueue();new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.get();Thread.sleep(new Random().nextInt(1000));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {while(true){bufferArraySimpleQueue.put();Thread.sleep(new Random().nextInt(500));}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}