Java Concurrent Programming (4)
4 锁
锁是递归的,是基于每线程的。锁提供了两种主要特性:互斥(mutual exclusion)和可见性(visibility)。互斥即一次只允许一个线程持有某个特定的锁,因此可使用该特性实现对共享数据的协调访问协议,这样,一次就只有一个线程能够使用该共享数据。可见性要更加复杂一些,它必须确保释放锁之前对共享数据做出的更改对于随后获得该锁的另一个线程是可见的。Java语言中,使用synchronized关键字可以实现锁机制,但是它有很多限制:
1)无法中断一个正在等候获得锁的线程。
2)没有办法改变锁的语意,即重入性,获取锁的公平性等。
3)方法和块内的同步,使得只能够够对严格的块结构使用锁。例如:不能在一个方法中获得锁,而在另外一个方法中释放锁。
在这种情况下,引入了java.util.concurrent.locks具有编写的高质量、高效率、语义上准确的线程控制结构工具包。
4.1 Lock & Condition & ReadWriteLock接口
Lock接口定义如下:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition();}Lock lock = new ReentrantLock(); if (lock.tryLock()) { try { // manipulate protected state } finally { // if acquire lock ,must release finally lock.unlock(); } } else { // perform alternative actions }public interface Condition{ void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }private final Lock lock = new ReentrantLock();private final Condition condition = lock.newCondition();
public interface ReadWriteLock {Lock readLock();Lock writeLock();}public ReentrantLock();public ReentrantLock(boolean fair);
public class ReentrantLockDemo{//private int count;private final int CAPACITY = 1;private final Lock lock = new ReentrantLock();private final Condition isEmpty = lock.newCondition();private final Condition isFull = lock.newCondition();private final Object[] cache = new Object[CAPACITY];public ReentrantLockDemo(int count){this.count = count;}public Object get() throws InterruptedException{lock.lock();try {while(isEmpty()){isEmpty.await();}count--;isFull.signal();return cache[count];} finally{lock.unlock();}}public void put() throws InterruptedException{lock.lock();try {while(isFull()){isFull.await();}count++;cache[0] = Thread.currentThread().getName() + "put a message";System.out.println(cache[0]);isEmpty.signal();} finally{lock.unlock();}}public boolean isFull(){return count == cache.length;}public boolean isEmpty(){return count == 0;}}public class ReentrantReadWriteLockDemo {//private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();private final Lock read = rwl.readLock();private final Lock write = rwl.writeLock();private final Map<String,Object> map = new HashMap<String, Object>();public Object get(String key) {read.lock();try {return map.get(key);} finally{read.unlock();}}public void set(String key, Object value){write.lock();try {map.put(key, value);} finally{write.unlock();}}public void clear(){write.lock();try {map.clear();} finally{write.unlock();}}}public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(5);for (int index = 0; index < 20; index++) {final int NO = index;Runnable run = new Runnable() {public void run() {try {semaphore.acquire();System.out.println("acquire thread" + NO + "***********");Thread.sleep(3000);semaphore.release();System.out.println("release thread" + NO + "###########");} catch (Exception e) {}}};service.execute(run);}service.shutdown();}public class Mutex implemets Sync { public void acquire() throws InterruptedException; public void release(); public boolean attempt(long msec) throws InterruptedException;}public class CountDownLatchDemo {//private static final int PLAY_AMOUNT = 10;public static void main(String[] args) {CountDownLatch begin = new CountDownLatch(1);CountDownLatch end = new CountDownLatch(PLAY_AMOUNT);Player[] players = new Player[PLAY_AMOUNT];for(int i = 0; i < players.length; i ++){players[i] = new Player(i+1, begin, end);}ExecutorService executor = Executors.newFixedThreadPool(PLAY_AMOUNT);for(Player player : players){executor.execute(player);}System.out.println("game begin ");begin.countDown();try {end.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("game over ");executor.shutdown();}}class Player implements Runnable{//private final int id;private final CountDownLatch begin;private final CountDownLatch end;public Player(int id, CountDownLatch begin, CountDownLatch end){this.id = id;this.begin = begin;this.end = end;}public void run(){try {begin.await();Thread.sleep((long)(Math.random() * 100));System.out.println("Player " + id + " has arrived");} catch (InterruptedException e) {e.printStackTrace();}finally{end.countDown();}}}public CyclicBarrier(int parties, Runnable barrierAction);public CyclicBarrier(int parties);
public int await() throws InterruptedException, BrokenBarrierExceptionpublic int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException;public void reset();
public class CyclicBarrierDemo {public static void main(String[] args) {final ExecutorService executor = Executors.newCachedThreadPool();final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable(){public void run(){System.out.println("all people arrived , let's go");}});for(int i = 0; i < 5; i ++){executor.execute(new Runnable(){public void run(){try {Thread.sleep((long)(Math.random() * 100));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getId() + " has arrived");try {barrier.await(); // wait other people to arrive} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});}executor.shutdown();}}