首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

指头上的代码-之ACTIVEMQ(4)

2012-06-27 
指尖上的代码--之ACTIVEMQ(4)??? public void reconnect(boolean rebalance) {??????? synchronized (reco

指尖上的代码--之ACTIVEMQ(4)

??? public void reconnect(boolean rebalance) {

??????? synchronized (reconnectMutex) {

??????????? if (started) {

??????????????? if (rebalance) {

??????????????????? doRebalance = true;

??????????????? }

??????????????? LOG.debug("Waking up reconnect task");

??????????????? try {

??????????????????? reconnectTask.wakeup();

??????????????? } catch (InterruptedException e) {

??????????????????? Thread.currentThread().interrupt();

??????????????? }

??????????? } else {

??????????????? LOG.debug("Reconnect was triggered but transport is not started yet. Wait forstart to connect the transport.");

??????????? }

??????? }

}

??? public final void handleTransportFailure(IOException e) throws InterruptedException {

??????? if (LOG.isTraceEnabled()) {

??????????? LOG.trace(this + "handleTransportFailure: " + e);

??????? }

??????? Transport transport = connectedTransport.getAndSet(null);

??????? if (transport == null) {

??????????? // sync with possible in progress reconnect

??????????? synchronized (reconnectMutex) {

??????????????? transport = connectedTransport.getAndSet(null);

??????????? }

??????? }

??????? if (transport != null) {

?

??????????? disposeTransport(transport);

?

??????????? boolean reconnectOk = false;

??????????? synchronized (reconnectMutex) {

??????????????? if (started) {

??????????????????? LOG.warn("Transport (" +transport.getRemoteAddress() + ") failed to" + connectedTransportURI

??????????????????????????? + " , attempting to automatically reconnect due to:" + e);

??????????????????? LOG.debug("Transport failed with the following exception:", e);

??????????????????? reconnectOk = true;

??????????????? }

??????????????? initialized = false;

??????????????? failedConnectTransportURI = connectedTransportURI;

??????????????? connectedTransportURI = null;

??????????????? connected = false;

?

??????????????? // notify before any reconnect attempt so ack state can be

??????????????? // whacked

??????????????? if (transportListener != null) {

???????? ???????????transportListener.transportInterupted();

??????????????? }

?

??????????????? if (reconnectOk) {

??????????????????? reconnectTask.wakeup();

??????????????? }

??????????? }

??????? }

final boolean doReconnect() {

??? Exception failure = null;

??? synchronized (reconnectMutex){

?

??????? //第一部分是用来处理配置形式uris的,如果有配置,优先读取配置

??? ?? ?//的uri进行重连,这里没有用到配置。

??????? String fileURL = getUpdateURIsURL();

??????? if (fileURL != null) {

??????????? ……

??????? }

?

??????? if (disposed || connectionFailure != null) {

??????????? reconnectMutex.notifyAll();

??????? }

?

??????? if ((connectedTransport.get() != null &&!doRebalance) || disposed || connectionFailure != null) {

??????????? return false;

??????? } else {

??????????? List<URI> connectList =getConnectList();

??????????? if (connectList.isEmpty()) {

??????????????? failure = new IOException("No uris available to connect to.");

??????????? } else {

????????????? //第二部分是做重连负载的,这里也没有用到

??????????????? if (doRebalance) {

?????????????????? if (connectList.get(0).equals(connectedTransportURI)) {

??????????????????????????? // already connected to first in the list, no need torebalance

??????????????????????????? doRebalance = false;

??????????????????????????? return false;

??????????????????????? } else {

??????????????????????????? LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);

??????????????????????????? try {

????????????? ??????????????????Transport transport = this.connectedTransport.getAndSet(null);

??????????????????????????????? if (transport != null) {

???????????????????????????????????disposeTransport(transport);

??????????????????????????????? }

?????????????????? ?????????} catch (Exception e) {

??????????????????????????????? LOG.debug("Caught an exception stopping existing transport forrebalance", e);

??????????????????????????? }

??????????????????????? }

??????????????? }

??????????????? if (!useExponentialBackOff || reconnectDelay ==DEFAULT_INITIAL_RECONNECT_DELAY) {

??????????????????? reconnectDelay =initialReconnectDelay;

??????????????? }

??????????????? synchronized (backupMutex) {

????????????????? //第三部分是关于Transport的备份机制的,这里也没有设置备份

??????????????????? if (backup && !backups.isEmpty()) {

??????????????????????? ……

??????????????????? }

??????????????? }

?????????????

????????????? //第四部分生成新的Transport,进行重连。

??????????????? Iterator<URI> iter =connectList.iterator();

??????????????? while (iter.hasNext() &&connectedTransport.get() == null && !disposed) {

??????????????????? URI uri = iter.next();

??????????????????? Transport t = null;

??????????????????? try {

??????????????????????? LOG.debug("Attempting connect to: " + uri);

???????????????????????SslContext.setCurrentSslContext(brokerSslContext);

??????????????????????? t =TransportFactory.compositeConnect(uri);

???????????????????????t.setTransportListener(myTransportListener);

??????????????????????? t.start();

?

??????????????????????? if (started) {

???????????????????????????restoreTransport(t);

??????????????????????? }

?

??????????????????????? LOG.debug("Connection established");

??????????????????????? reconnectDelay =initialReconnectDelay;

????????? ??????????????connectedTransportURI = uri;

???????????????????????connectedTransport.set(t);

???????????????????????reconnectMutex.notifyAll();

??????????????????????? connectFailures = 0;

??????????????????????? // Make sure on initial startup, that the

??????????????????????? // transportListener

??????????????????????? // has been initialized for this instance.

??????????????????????? synchronized(listenerMutex) {

??????????????????????????? if (transportListener== null) {

?????????????????????????? ?????try {

??????????????????????????????????? // if it isn't set after 2secs - it

??????????????????????????????????? // probably never will be

???????????????????????????????????listenerMutex.wait(2000);

??????????????????????????????? } catch (InterruptedExceptionex) {

??????????????????????????????? }

??????????????????????????? }

??????????????????????? }

??????????????????????? ……

??????????????? }

??????????? }

??????? }

??????? ???? ……

??????????? return false;

??????? }

??? }

??? if (!disposed) {

?????? ……

??? }

??? return !disposed;

}

第一部分是用来处理配置形式uris的,如果有配置,优先读取配置的uri,并添加到重连uris中本身不做重连,这里没有用到该配置。

第二部分是做负载重连的,根据重连uris如果第一个已经对应的Transport在工作了则无需重连直接返回,否则去掉当前工作的Transport达到负载的目的。这里也没有用到。

第三部分是关于Transport的备份机制的,如果设置了备份机制,且有Transport已经备份则取出该备份Transport返回。这里也没有设置备份。

可以看到前三部分都没有进行实际的重连工作,第四部分才是在上述三部分都不存在的情况下,进行实际的重连工作。

重连工作其实也很简单,根据uri找到对应的Transport,对该Transport设置独有的myTransportListener并启动。然后设置到FailoverTransport的connectedTransport作为当前连接的Transport。

?

通过对FailoverTransport重连机制的分析,基本上可以回答上面提出的第一个问题,为什么合成模式中不是直接add Transport而是uri,并且是数组形式的uris?因为对于重连而言合成Transport是没有意义的,一旦连接异常,作为被合成的Transport对象自身没有重连机制,注定要被废弃。那么只有把uri添加进来,才对后续的重连有益。而数组形式的uris则增强了重连的功能,使其能够尝试不同的uri进行重连。

热点排行