Java多线程(九)之ReentrantLock与Condition(转)
java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,而不是作为语言的特性来实现。这就为 Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。 ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多 线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)
?reentrant 锁意味着什么呢?简 单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个 synchronized 块时,才释放锁。?1.2?ReentrantLock与synchronized的比较??相同:ReentrantLock提供了synchronized类似的功能和内存语义。
不同:
(1)ReentrantLock 功能性方面更全面,比如时间锁等候,可中断锁等候,锁投票等,因此更有扩展性。在多个条件变量和高度竞争锁的地方,用ReentrantLock更合 适,ReentrantLock还提供了Condition,对线程的等待和唤醒等操作更加灵活,一个ReentrantLock可以有多个 Condition实例,所以更有扩展性。
(2)ReentrantLock 的性能比synchronized会好点。
(3)ReentrantLock提供了可轮询的锁请求,他可以尝试的去取得锁,如果取得成功则继续处理,取得不成功,可以等下次运行的时候处理,所以不容易产生死锁,而synchronized则一旦进入锁请求要么成功,要么一直阻塞,所以更容易产生死锁。
?
1.3?ReentrantLock扩展的功能??
1.3.1 实现可轮询的锁请求??在内部锁中,死锁是致命的——唯一的恢复方法是重新启动程序,唯一的预防方法是在构建程序时不要出错。而可轮询的锁获取模式具有更完善的错误恢复机制,可以规避死锁的发生。?条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
条件(也称为条件队列?或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式?释放相关的锁,并挂起当前线程,就像?Object.wait?做的那样。
上述API说明表明条件变量需要与锁绑定,而且多个Condition需要绑定到同一锁上。前面的Lock中提到,获取一个条件变量的方法是Lock.newCondition()。
?
[java] view plaincopy以上是Condition接口定义的方法,await*对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。
每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。
一个使用Condition实现生产者消费者的模型例子如下。
?
[java] view plaincopy
- import?java.util.concurrent.locks.Condition;??import?java.util.concurrent.locks.Lock;??
- import?java.util.concurrent.locks.ReentrantLock;????
- public?class?ProductQueue<T>?{????
- ????private?final?T[]?items;????
- ????private?final?Lock?lock?=?new?ReentrantLock();????
- ????private?Condition?notFull?=?lock.newCondition();????
- ????private?Condition?notEmpty?=?lock.newCondition();????
- ????//??????private?int?head,?tail,?count;??
- ??????public?ProductQueue(int?maxSize)?{??
- ????????items?=?(T[])?new?Object[maxSize];??????}??
- ??????public?ProductQueue()?{??
- ????????this(10);??????}??
- ??????public?void?put(T?t)?throws?InterruptedException?{??
- ????????lock.lock();??????????try?{??
- ????????????while?(count?==?getCapacity())?{??????????????????notFull.await();??
- ????????????}??????????????items[tail]?=?t;??
- ????????????if?(++tail?==?getCapacity())?{??????????????????tail?=?0;??
- ????????????}??????????????++count;??
- ????????????notEmpty.signalAll();??????????}?finally?{??
- ????????????lock.unlock();??????????}??
- ????}????
- ????public?T?take()?throws?InterruptedException?{??????????lock.lock();??
- ????????try?{??????????????while?(count?==?0)?{??
- ????????????????notEmpty.await();??????????????}??
- ????????????T?ret?=?items[head];??????????????items[head]?=?null;//GC??
- ????????????//??????????????if?(++head?==?getCapacity())?{??
- ????????????????head?=?0;??????????????}??
- ????????????--count;??????????????notFull.signalAll();??
- ????????????return?ret;??????????}?finally?{??
- ????????????lock.unlock();??????????}??
- ????}????
- ????public?int?getCapacity()?{??????????return?items.length;??
- ????}????
- ????public?int?size()?{??????????lock.lock();??
- ????????try?{??????????????return?count;??
- ????????}?finally?{??????????????lock.unlock();??
- ????????}??????}??
- ??}??
在这个例子中消费take()需要 队列不为空,如果为空就挂起(await()),直到收到notEmpty的信号;生产put()需要队列不满,如果满了就挂起(await()),直到收到notFull的信号。
可能有人会问题,如果一个线程lock()对象后被挂起还没有unlock,那么另外一个线程就拿不到锁了(lock()操作会挂起),那么就无法通知(notify)前一个线程,这样岂不是“死锁”了?
?
2.1 await* 操作?上一节中说过多次ReentrantLock是独占锁,一个线程拿到锁后如果不释放,那么另外一个线程肯定是拿不到锁,所以在lock.lock()和lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。我们再回头看代码,不管take()还是put(),在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁!
?
[java] view plaincopy
- public?final?void?await()?throws?InterruptedException?{??????if?(Thread.interrupted())??
- ????????throw?new?InterruptedException();??????Node?node?=?addConditionWaiter();??
- ????int?savedState?=?fullyRelease(node);??????int?interruptMode?=?0;??
- ????while?(!isOnSyncQueue(node))?{??????????LockSupport.park(this);??
- ????????if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)??????????????break;??
- ????}??????if?(acquireQueued(node,?savedState)?&&?interruptMode?!=?THROW_IE)??
- ????????interruptMode?=?REINTERRUPT;??????if?(node.nextWaiter?!=?null)??
- ????????unlinkCancelledWaiters();??????if?(interruptMode?!=?0)??
- ????????reportInterruptAfterWait(interruptMode);??}??
上面是await()的代码片段。上一节中说过,AQS在获取锁的时候需要有一个CHL的FIFO队列,所以对于一个Condition.await()而言,如果释放了锁,要想再一次获取锁那么就需要进入队列,等待被通知获取锁。完整的await()操作是安装如下步骤进行的:
这里再回头介绍Condition的数据结构。我们知道一个Condition可以在多个地方被await*(),那么就需要一个FIFO的结构将这些Condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition内部就需要一个FIFO的队列。
?
[java] view plaincopy
- private?transient?Node?firstWaiter;??private?transient?Node?lastWaiter;??
上面的两个节点就是描述一个FIFO的队列。我们再结合前面提到的节点(Node)数据结构。我们就发现Node.nextWaiter就派上用场了!nextWaiter就是将一系列的Condition.await*串联起来组成一个FIFO的队列。
?
2.2 signal/signalAll 操作?await*()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await*()中FIFO队列中第一个Node唤醒(或者全部Node)唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
?
[java] view plaincopy
- private?void?doSignal(Node?first)?{??????do?{??
- ????????if?(?(firstWaiter?=?first.nextWaiter)?==?null)??????????????lastWaiter?=?null;??
- ????????first.nextWaiter?=?null;??????}?while?(!transferForSignal(first)?&&??
- ?????????????(first?=?firstWaiter)?!=?null);??}??
- ??private?void?doSignalAll(Node?first)?{??
- ????lastWaiter?=?firstWaiter??=?null;??????do?{??
- ????????Node?next?=?first.nextWaiter;??????????first.nextWaiter?=?null;??
- ????????transferForSignal(first);??????????first?=?next;??
- ????}?while?(first?!=?null);??}??
上面的代码很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。
?
[java] view plaincopy
- final?boolean?transferForSignal(Node?node)?{??????if?(!compareAndSetWaitStatus(node,?Node.CONDITION,?0))??
- ????????return?false;????
- ????Node?p?=?enq(node);??????int?c?=?p.waitStatus;??
- ????if?(c?>?0?||?!compareAndSetWaitStatus(p,?c,?Node.SIGNAL))??????????LockSupport.unpark(node.thread);??
- ????return?true;??}??
上面就是唤醒一个await*()线程的过程,根据前面的小节介绍的,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)操作,将当前节点加入到AQS队列。
?