首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

堵塞Map BlockingMap的实现

2012-12-21 
阻塞Map BlockingMap的实现做socket应用用到了BlockingQueue接口,可用于生产者消费者模式,多个线程阻塞着

阻塞Map BlockingMap的实现
做socket应用用到了BlockingQueue接口,可用于生产者消费者模式,多个线程阻塞着等待queue的数据到来,但是如果是该线程需要等待某个特定的数据该如何处理呢,自己写了个BlockingMap

public interface BlockingMap<V> {public void put(Integer key, V o) throws InterruptedException;public V take(Integer key) throws InterruptedException;public V poll(Integer key, long timeout) throws InterruptedException;}public class HashBlockingMap<V> implements BlockingMap<V> {private ConcurrentMap<Integer, Item<V>> map;private final ReentrantLock lock = new ReentrantLock();public HashBlockingMap() {map = new ConcurrentHashMap<Integer, Item<V>>();}public void put(Integer key, V o) throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {if (map.containsKey(key)) {Item<V> item = map.get(key);item.put(o);} else {Item<V> item = new Item<V>();map.put(key, item);item.put(o);}} finally {            lock.unlock();        }}public V take(Integer key) throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {if (!map.containsKey(key)) {map.put(key, new Item<V>());}} finally {            lock.unlock();        }Item<V> item = map.get(key);V x = item.take();map.remove(key);return x;}public V poll(Integer key, long timeout) throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {if (!map.containsKey(key)) {map.put(key, new Item<V>());}} finally {            lock.unlock();        }Item<V> item = map.get(key);V x = item.poll(timeout);map.remove(key);return x;}private static class Item<E> {private final ReentrantLock lock = new ReentrantLock();private final Condition cond = lock.newCondition();private E obj = null;private void put(E o) throws InterruptedException {if (o == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {obj = o;cond.signal();} finally {lock.unlock();}}E take() throws InterruptedException {E x;final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {try {while (obj == null) {cond.await();}} catch (InterruptedException ie) {cond.signal();throw ie;}x = obj;} finally {lock.unlock();}return x;}private E poll(long timeout) throws InterruptedException {timeout = TimeUnit.MILLISECONDS.toNanos(timeout);E x;final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {            for (;;) {                if (obj != null) {                    x = obj;                    break;                }                if (timeout <= 0) {                    return null;                }                try {                timeout = cond.awaitNanos(timeout);                } catch (InterruptedException ie) {                cond.signal();                    throw ie;                }            }        } finally {        lock.unlock();        }return x;}}}// 消费者根据sequence取得自己想要的对象Response response = blockingMap.poll(sequence, timeout);// 生产者blockingMap.put(response.getSequence(), response);

热点排行