activemq 持久化topic处理过程及其消息游标轮转问题的解决方案
public synchronized boolean hasNext() { boolean result = true; if (result) { try { currentCursor = getNextCursor(); //A } catch (Exception e) { LOG.error("Failed to get current cursor ", e); throw new RuntimeException(e); } result = currentCursor != null ? currentCursor.hasNext() : false; } return result; }?
?
代码逻辑分析:得到cursor队列中下一个cursor,然后判断该cursor是否有消息。下面看getNextCursor的代码:
?
?
protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty()) { // E currentCursor = null; for (PendingMessageCursor tsp : storePrefetches) { if (tsp.hasNext()) { currentCursor = tsp; break; } } // round-robin if (storePrefetches.size()>1) { PendingMessageCursor first = storePrefetches.remove(0); storePrefetches.add(first); } } return currentCursor; }??
代码逻辑:判断cursor是否为空,如果为null或者empty,如果是,则从cursor队列中获得下一个PendingMessageCursor;如果该cursor的hasNext()为true,则跳出循环。下面会对cursor队列做Load balance处理,将第一个cursor放到队列尾部。这样设计的目的是保证所有的cursor都会被访问到。
?
下面看isEmpty()的实现:
?
AbstractStoreCursor
public final synchronized boolean isEmpty() { return size == 0; }?
下面看hasNext()的实现:?
AbstractStoreCursor
public final synchronized boolean hasNext() { if (batchList.isEmpty()) { try { fillBatch(); } catch (Exception e) { LOG.error(this + " - Failed to fill batch", e); throw new RuntimeException(e); } } ensureIterator(); return this.iterator.hasNext(); }?
?从实现来看,isEmpty和hasNext的判断方式不同,在极端情况下两者的返回值可能会不同,可能isEmpty为true而hasNext为false。
?
在激活DurableTopicSubscrition的时候,执行顺序是先检查cursor中消息数量,然后dispatch消息。由于在代码E处可能发现当前cursor不是empty,于是将该cursor返回,这样在代码A处会获得持久cursor,该cursor中可能会没有消息,不会继续dispatch消息。这样就导致在cursor队列中的非持久cursor始终无法获得执行的机会,导致非持久消息不会被dispatch。
?
解决方案,在getNextCursor()中增加hasNext()的判断,保证cursor在null或者empty或者没有元素的情况下会轮转到下一cursor:
?
protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty() || !currentCursor.hasNext()) { …… } return currentCursor; }?
?
?