指尖上的代码--之ACTIVEMQ(4)
??? public void reconnect(boolean rebalance) {
??????? synchronized (reconnectMutex) {
??????????? if (started) {
??????????????? if (rebalance) {
??????????????????? doRebalance = true;
??????????????? }
??????????????? LOG.debug("Waking up reconnect task");
??????????????? try {
??????????????????? reconnectTask.wakeup();
??????????????? } catch (InterruptedException e) {
??????????????????? Thread.currentThread().interrupt();
??????????????? }
??????????? } else {
??????????????? LOG.debug("Reconnect was triggered but transport is not started yet. Wait forstart to connect the transport.");
??????????? }
??????? }
}
??? public final void handleTransportFailure(IOException e) throws InterruptedException {
??????? if (LOG.isTraceEnabled()) {
??????????? LOG.trace(this + "handleTransportFailure: " + e);
??????? }
??????? Transport transport = connectedTransport.getAndSet(null);
??????? if (transport == null) {
??????????? // sync with possible in progress reconnect
??????????? synchronized (reconnectMutex) {
??????????????? transport = connectedTransport.getAndSet(null);
??????????? }
??????? }
??????? if (transport != null) {
?
??????????? disposeTransport(transport);
?
??????????? boolean reconnectOk = false;
??????????? synchronized (reconnectMutex) {
??????????????? if (started) {
??????????????????? LOG.warn("Transport (" +transport.getRemoteAddress() + ") failed to" + connectedTransportURI
??????????????????????????? + " , attempting to automatically reconnect due to:" + e);
??????????????????? LOG.debug("Transport failed with the following exception:", e);
??????????????????? reconnectOk = true;
??????????????? }
??????????????? initialized = false;
??????????????? failedConnectTransportURI = connectedTransportURI;
??????????????? connectedTransportURI = null;
??????????????? connected = false;
?
??????????????? // notify before any reconnect attempt so ack state can be
??????????????? // whacked
??????????????? if (transportListener != null) {
???????? ???????????transportListener.transportInterupted();
??????????????? }
?
??????????????? if (reconnectOk) {
??????????????????? reconnectTask.wakeup();
??????????????? }
??????????? }
??????? }
final boolean doReconnect() {
??? Exception failure = null;
??? synchronized (reconnectMutex){
?
??????? //第一部分是用来处理配置形式uris的,如果有配置,优先读取配置
??? ?? ?//的uri进行重连,这里没有用到配置。
??????? String fileURL = getUpdateURIsURL();
??????? if (fileURL != null) {
??????????? ……
??????? }
?
??????? if (disposed || connectionFailure != null) {
??????????? reconnectMutex.notifyAll();
??????? }
?
??????? if ((connectedTransport.get() != null &&!doRebalance) || disposed || connectionFailure != null) {
??????????? return false;
??????? } else {
??????????? List<URI> connectList =getConnectList();
??????????? if (connectList.isEmpty()) {
??????????????? failure = new IOException("No uris available to connect to.");
??????????? } else {
????????????? //第二部分是做重连负载的,这里也没有用到
??????????????? if (doRebalance) {
?????????????????? if (connectList.get(0).equals(connectedTransportURI)) {
??????????????????????????? // already connected to first in the list, no need torebalance
??????????????????????????? doRebalance = false;
??????????????????????????? return false;
??????????????????????? } else {
??????????????????????????? LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
??????????????????????????? try {
????????????? ??????????????????Transport transport = this.connectedTransport.getAndSet(null);
??????????????????????????????? if (transport != null) {
???????????????????????????????????disposeTransport(transport);
??????????????????????????????? }
?????????????????? ?????????} catch (Exception e) {
??????????????????????????????? LOG.debug("Caught an exception stopping existing transport forrebalance", e);
??????????????????????????? }
??????????????????????? }
??????????????? }
??????????????? if (!useExponentialBackOff || reconnectDelay ==DEFAULT_INITIAL_RECONNECT_DELAY) {
??????????????????? reconnectDelay =initialReconnectDelay;
??????????????? }
??????????????? synchronized (backupMutex) {
????????????????? //第三部分是关于Transport的备份机制的,这里也没有设置备份
??????????????????? if (backup && !backups.isEmpty()) {
??????????????????????? ……
??????????????????? }
??????????????? }
?????????????
????????????? //第四部分生成新的Transport,进行重连。
??????????????? Iterator<URI> iter =connectList.iterator();
??????????????? while (iter.hasNext() &&connectedTransport.get() == null && !disposed) {
??????????????????? URI uri = iter.next();
??????????????????? Transport t = null;
??????????????????? try {
??????????????????????? LOG.debug("Attempting connect to: " + uri);
???????????????????????SslContext.setCurrentSslContext(brokerSslContext);
??????????????????????? t =TransportFactory.compositeConnect(uri);
???????????????????????t.setTransportListener(myTransportListener);
??????????????????????? t.start();
?
??????????????????????? if (started) {
???????????????????????????restoreTransport(t);
??????????????????????? }
?
??????????????????????? LOG.debug("Connection established");
??????????????????????? reconnectDelay =initialReconnectDelay;
????????? ??????????????connectedTransportURI = uri;
???????????????????????connectedTransport.set(t);
???????????????????????reconnectMutex.notifyAll();
??????????????????????? connectFailures = 0;
??????????????????????? // Make sure on initial startup, that the
??????????????????????? // transportListener
??????????????????????? // has been initialized for this instance.
??????????????????????? synchronized(listenerMutex) {
??????????????????????????? if (transportListener== null) {
?????????????????????????? ?????try {
??????????????????????????????????? // if it isn't set after 2secs - it
??????????????????????????????????? // probably never will be
???????????????????????????????????listenerMutex.wait(2000);
??????????????????????????????? } catch (InterruptedExceptionex) {
??????????????????????????????? }
??????????????????????????? }
??????????????????????? }
??????????????????????? ……
??????????????? }
??????????? }
??????? }
??????? ???? ……
??????????? return false;
??????? }
??? }
??? if (!disposed) {
?????? ……
??? }
??? return !disposed;
}
第一部分是用来处理配置形式uris的,如果有配置,优先读取配置的uri,并添加到重连uris中本身不做重连,这里没有用到该配置。
第二部分是做负载重连的,根据重连uris如果第一个已经对应的Transport在工作了则无需重连直接返回,否则去掉当前工作的Transport达到负载的目的。这里也没有用到。
第三部分是关于Transport的备份机制的,如果设置了备份机制,且有Transport已经备份则取出该备份Transport返回。这里也没有设置备份。
可以看到前三部分都没有进行实际的重连工作,第四部分才是在上述三部分都不存在的情况下,进行实际的重连工作。
重连工作其实也很简单,根据uri找到对应的Transport,对该Transport设置独有的myTransportListener并启动。然后设置到FailoverTransport的connectedTransport作为当前连接的Transport。
?
通过对FailoverTransport重连机制的分析,基本上可以回答上面提出的第一个问题,为什么合成模式中不是直接add Transport而是uri,并且是数组形式的uris?因为对于重连而言合成Transport是没有意义的,一旦连接异常,作为被合成的Transport对象自身没有重连机制,注定要被废弃。那么只有把uri添加进来,才对后续的重连有益。而数组形式的uris则增强了重连的功能,使其能够尝试不同的uri进行重连。