并发容器——BlockingQueue相关类
java.util.concurrent提供了多种并发容器,总体上来说有4类
Queue类:BlockingQueue ConcurrentLinkedQueueMap类:ConcurrentMapSet类:ConcurrentSkipListSet CopyOnWriteArraySetList类:CopyOnWriteArrayList
接下来一系列文章,我会对每一类的源码进行分析,试图让它们的实现机制完全暴露在大家面前。这篇主要是BlockingQueue及其相关类。
先给出结构图:
下面我按这样的顺序来展开:
1、BlockingQueue2、ArrayBlockingQueue 2.1 添加新元素的方法:add/put/offer
2.2 该类的几个实例变量:takeIndex/putIndex/count/
2.3 Condition实现3、LinkedBlockingQueue 4、PriorityBlockingQueue5、DelayQueue<E extends Delayed>6、BlockingDque+LinkedBlockingQueue
其中前两个分析的尽量详细,为了方便大家看,基本贴出了所有相关源码。后面几个就用尽量用文字论述,如果看得吃力,建议对着jdk的源码看。
1、BlockingQueue
BlockingQueue继承了Queue,Queu是先入先出(FIFO),BlockingQueue是JDK 5.0新引入的。
根据队列null/full时的表现,BlockingQueue的方法分为以下几类:
至于为什么要使用并发容器,一个典型的例子就是生产者-消费者的例子,为了精简本文篇幅,放到附件中见附件:“生产者-消费者 测试.rar”。
另外,BlockingQueue接口定义的所有方法实现都是线程安全的,它的实现类里面都会用锁和其他控制并发的手段保证这种线程安全,但是这些类同时也实现了Collection接口(主要是AbstractQueue实现),所以会出现BlockingQueue的实现类也能同时使用Conllection接口方法,而这时会出现的问题就是像addAll,containsAll,retainAll和removeAll这类批量方法的实现不保证线程安全,举个例子就是addAll 10个items到一个ArrayBlockingQueue,可能中途失败但是却有几个item已经被放进这个队列里面了。
2、ArrayBlockingQueue
ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)以及是否为公平锁(fair参数)。
在创建ArrayBlockingQueue的时候默认创建的是非公平锁,不过我们可以在它的构造函数里指定。这里调用ReentrantLock的构造函数创建锁的时候,调用了:
public ReentrantLock(boolean fair) {
sync = (fair)? new FairSync() : new NonfairSync();
}
FairSync/ NonfairSync是ReentrantLock的内部类:
线程按顺序请求获得公平锁,而一个非公平锁可以闯入,如果锁的状态可用,请求非公平锁的线程可在等待队列中向前跳跃,获得该锁。内部锁synchronized没有提供确定的公平性保证。
分三点来讲这个类:
2.1 添加新元素的方法:add/put/offer2.2 该类的几个实例变量:takeIndex/putIndex/count/2.3 Condition实现
2.1 添加新元素的方法:add/put/offer
首先,谈到添加元素的方法,首先得分析以下该类同步机制中用到的锁:
lock = new ReentrantLock(fair);notEmpty = lock.newCondition();//Condition Variable 1notFull = lock.newCondition();//Condition Variable 2
1、public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock;//每个对象对应一个显示的锁 lock.lock();//请求锁直到获得锁(不可以被interrupte) try { if (count == items.length)//如果队列已经满了 return false; else { insert(e); return true; } } finally { lock.unlock();// }}看insert方法:private void insert(E x) { items[putIndex] = x;//增加全局index的值。/*Inc方法体内部:final int inc(int i) { return (++i == items.length)? 0 : i; }这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。如果插完了,putIndex可能重新变为0(在已经执行了移除操作的前提下,否则在之前的判断中队列为满) */ putIndex = inc(putIndex); ++count; notEmpty.signal();//wake up one waiting thread}
2public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly();//请求锁直到得到锁或者变为interrupted try { try { while (count == items.length)//如果满了,当前线程进入noFull对应的等waiting状态 notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); }}
3、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != items.length) { insert(e); return true; } if (nanos <= 0) return false; try {//如果没有被 signal/interruptes,需要等待nanos时间才返回 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
4、public boolean add(E e) {return super.add(e); }父类:public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
用三个数字来维护这个队列中的数据变更:/** items index for next take, poll or remove */ private int takeIndex; /** items index for next put, offer, or add. */ private int putIndex; /** Number of items in the queue */ private int count;
private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null;//移除已经被提取出的元素 takeIndex = inc(takeIndex);//策略和添加元素时相同 --count; notFull.signal();//提醒其他在notFull这个Condition上waiting的线程可以尝试工作了 return x; }
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //加一个新的condition等待节点 Node node = addConditionWaiter();//释放自己的锁 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //如果当前线程 等待状态时CONDITION,park住当前线程,等待condition的signal来解除 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private final PriorityQueue<E> q; private final ReentrantLock lock = new ReentrantLock(true); private final Condition notEmpty = lock.newCondition();