首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

tomcat session复制(2)

2012-09-12 
tomcat session复制(二)???? 上文http://nod0620.iteye.com/admin/blogs/1030398写了session复制的发送部

tomcat session复制(二)

???? 上文http://nod0620.iteye.com/admin/blogs/1030398写了session复制的发送部分,继续接收部分:

当接收方tomcat接收到需要session的消息时,最终调用了GroupChannel的messageReceived()方法

    public void messageReceived(ChannelMessage msg) {        if ( msg == null ) return;        try {            if ( Logs.MESSAGES.isTraceEnabled() ) {                Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());            }            Serializable fwd = null;            if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {                fwd = new ByteMessage(msg.getMessage().getBytes());            } else {                try {                    fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength());                }catch (Exception sx) {                    log.error("Unable to deserialize message:"+msg,sx);                    return;                }            }            if ( Logs.MESSAGES.isTraceEnabled() ) {                Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);            }            //get the actual member with the correct alive time            Member source = msg.getAddress();            boolean rx = false;            boolean delivered = false;            for ( int i=0; i<channelListeners.size(); i++ ) {                ChannelListener channelListener = (ChannelListener)channelListeners.get(i);                if (channelListener != null && channelListener.accept(fwd, source)) {                    channelListener.messageReceived(fwd, source);                    delivered = true;                    //if the message was accepted by an RPC channel, that channel                    //is responsible for returning the reply, otherwise we send an absence reply                    if ( channelListener instanceof RpcChannel ) rx = true;                }            }//for            if ((!rx) && (fwd instanceof RpcMessage)) {                //if we have a message that requires a response,                //but none was given, send back an immediate one                sendNoRpcChannelReply((RpcMessage)fwd,source);            }            if ( Logs.MESSAGES.isTraceEnabled() ) {                Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));            }        } catch ( Exception x ) {            //this could be the channel listener throwing an exception, we should log it             //as a warning.            if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);            throw new RemoteProcessException("Exception:"+x.getMessage(),x);        }    }

??? 可以看到,这里主要是调用ChannelListener的messageReceived方法,显然这个实现类就是SimpleTcpCluster:

?

 public void messageReceived(Serializable message, Member sender) {        ClusterMessage fwd = (ClusterMessage)message;        fwd.setAddress(sender);        messageReceived(fwd);    }    public void messageReceived(ClusterMessage message) {        long start = 0;        if (log.isDebugEnabled() && message != null)            log.debug("Assuming clocks are synched: Replication for "                    + message.getUniqueId() + " took="                    + (System.currentTimeMillis() - (message).getTimestamp())                    + " ms.");        //invoke all the listeners        boolean accepted = false;        if (message != null) {            for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) {                ClusterListener listener = (ClusterListener) iter.next();                if (listener.accept(message)) {                    accepted = true;                    listener.messageReceived(message);                }            }        }        if (!accepted && log.isDebugEnabled()) {            if (notifyLifecycleListenerOnFailure) {                Member dest = message.getAddress();                // Notify our interested LifecycleListeners                lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT,                        new SendMessageData(message, dest, null));            }            log.debug("Message " + message.toString() + " from type "                    + message.getClass().getName()                    + " transfered but no listener registered");        }        return;    }

?? 其实这里调用的是ClusterListener的messageReceived()方法,默认的两个ClusterListener是ClusterSessionListener和JvmRouteSessionIDBinderListener,JvmRouteSessionIDBinderListener只有在发送的消息是关于session? id改变的消息时才起作用,重点看ClusterSessionListener.messageReceived()方法,其实是调用到了DeltaManager的messageDataReceived()方法:

    public void messageDataReceived(ClusterMessage cmsg) {        if (cmsg != null && cmsg instanceof SessionMessage) {            SessionMessage msg = (SessionMessage) cmsg;            switch (msg.getEventType()) {                case SessionMessage.EVT_GET_ALL_SESSIONS:                case SessionMessage.EVT_SESSION_CREATED:                 case SessionMessage.EVT_SESSION_EXPIRED:                 case SessionMessage.EVT_SESSION_ACCESSED:                case SessionMessage.EVT_SESSION_DELTA:                case SessionMessage.EVT_CHANGE_SESSION_ID: {                    synchronized(receivedMessageQueue) {                        if(receiverQueue) {                            receivedMessageQueue.add(msg);                            return ;                        }                    }                   break;                }                default: {                    //we didn't queue, do nothing                    break;                }            } //switch                        messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);        }    }
?

???? 这里的事件是EVT_GET_ALL_SESSIONS,在messageReceived()方法中有这个分支的处理代码,最后面调用

handleGET_ALL_SESSIONS()方法:

 protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {        counterReceive_EVT_GET_ALL_SESSIONS++;        //get a list of all the session from this manager        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));        // Write the number of active sessions, followed by the details        // get all sessions and serialize without sync        Session[] currentSessions = findSessions();        long findSessionTimestamp = System.currentTimeMillis() ;        if (isSendAllSessions()) {            sendSessions(sender, currentSessions, findSessionTimestamp);        } else {            // send session at blocks            for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {                int len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();                Session[] sendSessions = new Session[len];                System.arraycopy(currentSessions, i, sendSessions, 0, len);                sendSessions(sender, sendSessions,findSessionTimestamp);                if (getSendAllSessionsWaitTime() > 0) {                    try {                        Thread.sleep(getSendAllSessionsWaitTime());                    } catch (Exception sleep) {                    }                }//end if            }//for        }//end if                SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());        newmsg.setTimestamp(findSessionTimestamp);        if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));        counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;        cluster.send(newmsg, sender);    }

?? 主要是先找到所有的session,然后发送所有的session,然后发送一个标志结束的消息表示发送完成。这样第一次tomcat的启动加入集群是拿所用的session就完成了,主要的流程是这样的:

?

发送方:

StandardContext.start()--->SimpleTcpCluster.createManager()--->StandardContext.setManager()--->

DeltaManager.start()--->DeltaManager.getAllClusterSessions()--->SimpleTcpCluster.send()

---->GroupChannel.send()---->ParallelNioSender.sendMessage()--->NioSender.process()

?

接收方:

NioReceiver.listen()--->NioReceiver.readDataFromSocket()---->NioReplicationTask.drainChannel()--->

ListenCallback.messageDataReceived()---->ChannelCoordinator.messageReceived()--->

GroupChannel.messageReceived()--->SimpleTcpCluster.messageReceived()--->

DeltaManager.messageDataReceived()---->DeltaManager.handleGET_ALL_SESSIONS()

?

接收方准备好数据又要发送给发送方,简单来说通信是这样:

发送方发送请求--->接收方接受请求,发送数据,一个是session的数据,一个是session发送完毕的数据--->

发送方收到接收方过来的两种数据

?

?

?

?

?

?

?

热点排行