首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

activemq 持久化topic处理过程及其消息游标轮转有关问题的解决方案

2013-12-09 
activemq 持久化topic处理过程及其消息游标轮转问题的解决方案public synchronized boolean hasNext() {bo

activemq 持久化topic处理过程及其消息游标轮转问题的解决方案
public synchronized boolean hasNext() { boolean result = true; if (result) { try { currentCursor = getNextCursor(); //A } catch (Exception e) { LOG.error("Failed to get current cursor ", e); throw new RuntimeException(e); } result = currentCursor != null ? currentCursor.hasNext() : false; } return result; }?

?

代码逻辑分析:得到cursor队列中下一个cursor,然后判断该cursor是否有消息。下面看getNextCursor的代码:

?

?

protected synchronized PendingMessageCursor getNextCursor() throws Exception {        if (currentCursor == null || currentCursor.isEmpty()) {  // E            currentCursor = null;            for (PendingMessageCursor tsp : storePrefetches) {                if (tsp.hasNext()) {                                                  currentCursor = tsp;                    break;                }            }            // round-robin            if (storePrefetches.size()>1) {                PendingMessageCursor first = storePrefetches.remove(0);                storePrefetches.add(first);            }        }        return currentCursor;    }
?

?

代码逻辑:判断cursor是否为空,如果为null或者empty,如果是,则从cursor队列中获得下一个PendingMessageCursor;如果该cursor的hasNext()为true,则跳出循环。下面会对cursor队列做Load balance处理,将第一个cursor放到队列尾部。这样设计的目的是保证所有的cursor都会被访问到。

?

下面看isEmpty()的实现:

?

AbstractStoreCursor

 public final synchronized boolean isEmpty() {        return size == 0;    }

?

下面看hasNext()的实现:?

AbstractStoreCursor

public final synchronized boolean hasNext() {        if (batchList.isEmpty()) {            try {                fillBatch();            } catch (Exception e) {                LOG.error(this + " - Failed to fill batch", e);                throw new RuntimeException(e);            }        }        ensureIterator();        return this.iterator.hasNext();    }

?

?从实现来看,isEmpty和hasNext的判断方式不同,在极端情况下两者的返回值可能会不同,可能isEmpty为true而hasNext为false。

?

在激活DurableTopicSubscrition的时候,执行顺序是先检查cursor中消息数量,然后dispatch消息。由于在代码E处可能发现当前cursor不是empty,于是将该cursor返回,这样在代码A处会获得持久cursor,该cursor中可能会没有消息,不会继续dispatch消息。这样就导致在cursor队列中的非持久cursor始终无法获得执行的机会,导致非持久消息不会被dispatch。

?

解决方案,在getNextCursor()中增加hasNext()的判断,保证cursor在null或者empty或者没有元素的情况下会轮转到下一cursor:

?

protected synchronized PendingMessageCursor getNextCursor() throws Exception {        if (currentCursor == null || currentCursor.isEmpty() || !currentCursor.hasNext()) {            ……        }        return currentCursor;    }

?

?

?

热点排行