首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

tomcat session复制(1)

2012-10-29 
tomcat session复制(一)?tomcat的session复制大致分两种:all-to-all和backup,先看all-to-all,主要是记录下

tomcat session复制(一)

?tomcat的session复制大致分两种:all-to-all和backup,先看all-to-all,主要是记录下自己读源代码的心得和代码流程

?

?tomcat集群配置暂时略过。

?? ? 在tomcat的启动过程中,找到seesion复制的入口,在StandardContext的start()方法中:

?

            Manager contextManager = null;                if (manager == null) {                    if ( (getCluster() != null) && distributable) {                        try {                            contextManager = getCluster().createManager(getName());                        } catch (Exception ex) {                            log.error("standardContext.clusterFail", ex);                            ok = false;                        }                    } else {                        contextManager = new StandardManager();                    }                }                                 // Configure default manager if none was specified                if (contextManager != null) {                    setManager(contextManager);                }                if (manager!=null && (getCluster() != null) && distributable) {                    //let the cluster know that there is a context that is distributable                    //and that it has its own manager                    getCluster().registerManager(manager);                }

????? 如果配置了tomcat的集群,那么 contextManager = getCluster().createManager(getName())就会被调用,返回的Manager的类型是DeltaManager,接着调用setManager(contextManager)方法:

?

    public synchronized void setManager(Manager manager) {        // Change components if necessary  管理session的manager        Manager oldManager = this.manager;        if (oldManager == manager)            return;        this.manager = manager;        // Stop the old component if necessary        if (started && (oldManager != null) &&            (oldManager instanceof Lifecycle)) {            try {                ((Lifecycle) oldManager).stop();            } catch (LifecycleException e) {                log.error("ContainerBase.setManager: stop: ", e);            }        }        // Start the new component if necessary        if (manager != null)            manager.setContainer(this);        if (started && (manager != null) &&            (manager instanceof Lifecycle)) {            try {                ((Lifecycle) manager).start();            } catch (LifecycleException e) {                log.error("ContainerBase.setManager: start: ", e);            }        }        // Report this property change to interested listeners        support.firePropertyChange("manager", oldManager, this.manager);    }

?? 看到红色部分,这里调用了DeltaManager的start()方法,也就是启动了DeltaManager,看代码:

 public void start() throws LifecycleException {        if (!initialized) init();        // Validate and update our current component state        if (started) {            return;        }        started = true;        lifecycle.fireLifecycleEvent(START_EVENT, null);        // Force initialization of the random number generator        generateSessionId();        // Load unloaded sessions, if any        try {            //the channel is already running            Cluster cluster = getCluster() ;            // stop remove cluster binding            //wow, how many nested levels of if statements can we have ;)            if(cluster == null) {                Container context = getContainer() ;                if(context != null && context instanceof Context) {                     Container host = context.getParent() ;                     if(host != null && host instanceof Host) {                         cluster = host.getCluster();                         if(cluster != null && cluster instanceof CatalinaCluster) {                             setCluster((CatalinaCluster) cluster) ;                         } else {                             Container engine = host.getParent() ;                             if(engine != null && engine instanceof Engine) {                                 cluster = engine.getCluster();                                 if(cluster != null && cluster instanceof CatalinaCluster) {                                     setCluster((CatalinaCluster) cluster) ;                                 }                             } else {                                     cluster = null ;                             }                         }                     }                }            }            if (cluster == null) {                log.error(sm.getString("deltaManager.noCluster", getName()));                return;            } else {                if (log.isInfoEnabled()) {                    String type = "unknown" ;                    if( cluster.getContainer() instanceof Host){                        type = "Host" ;                    } else if( cluster.getContainer() instanceof Engine){                        type = "Engine" ;                    }                    log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));                }            }            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));            //to survice context reloads, as only a stop/start is called, not            // createManager            cluster.registerManager(this);            //随机找一个节点,然后复制所有的session过来            getAllClusterSessions();        } catch (Throwable t) {            log.error(sm.getString("deltaManager.managerLoad"), t);        }    }
?

?? 首先,如果没有初始化,那么进行初始化,接着设置Cluster,然后调用Cluster的registerManager()方法,注册Manager,这里没有看明白,因为在StandardContext的start()方法中也有这个方法的调用,有重复的嫌疑?

?? 最重要的是调用getAllClusterSessions()方法,这个方法的作用是,在集群中取到第一台tomcat的member,发送消息要求获得对方的所有session数据,对方收到消息,进行处理发送所有数据,这台tomcat收到数据,之后发送数据告知已经收到数据,双方通信结束。看代码:

?

    public synchronized void getAllClusterSessions() {        if (cluster != null && cluster.getMembers().length > 0) {            long beforeSendTime = System.currentTimeMillis();            Member mbr = findSessionMasterMember();            if(mbr == null) { // No domain member found                 return;            }            SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());            // set reference time            stateTransferCreateSendTime = beforeSendTime ;            // request session state            counterSend_EVT_GET_ALL_SESSIONS++;            stateTransfered = false ;            // FIXME This send call block the deploy thread, when sender waitForAck is enabled            try {                synchronized(receivedMessageQueue) {                     receiverQueue = true ;                }                cluster.send(msg, mbr);                if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr,getStateTransferTimeout()));                // FIXME At sender ack mode this method check only the state transfer and resend is a problem!                waitForSendAllSessions(beforeSendTime);            } finally {                synchronized(receivedMessageQueue) {                    for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {                        SessionMessage smsg = (SessionMessage) iter.next();                        if (!stateTimestampDrop) {                            messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);                        } else {                            if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {                                // FIXME handle EVT_GET_ALL_SESSIONS later                                messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);                            } else {                                if (log.isWarnEnabled()) {                                    log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));                                }                            }                        }                    }                            receivedMessageQueue.clear();                    receiverQueue = false ;                }           }        } else {            if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));        }    }

?? ?? 首先,需要找到集群当中的一个tomcat,因为all-to-all复制中所有的tomcat都是对等的,所以第一个就可以了.

之后,生成一个EVT_GET_ALL_SESSIONS的消息,调用Manager的属性cluster的send()给集群中的tomcat发送要求得到所有session的消息,这个过程是异步的,所以在下面有waitForSendAllSessions()方法的调用,如果在60s内集群里面的tomcat没有给这个tomcat回应,那么这个tomcat就timeout,不能加入集群了。

?

????? 我们先看SimpleTcpCluster的send()方法:

?

    public void send(ClusterMessage msg, Member dest) {        try {            msg.setAddress(getLocalMember());            if (dest != null) {                if (!getLocalMember().equals(dest)) {                    channel.send(new Member[] {dest}, msg,channelSendOptions);                } else                    log.error("Unable to send message to local member " + msg);            } else {                if (channel.getMembers().length>0)                    channel.send(channel.getMembers(),msg,channelSendOptions);                else if (log.isDebugEnabled())                     log.debug("No members in cluster, ignoring message:"+msg);            }        } catch (Exception x) {            log.error("Unable to send message through cluster sender.", x);        }    }

?? 实际调用的是GroupChannel的send()方法,另外发送数据的类别是异步的(SEND_OPTIONS_ASYNCHRONOUS)

? 接着调用的是ChannelInterceptor的sendMessage()方法,这是个责任链模式的应用,其实默认的ChannelInterceptor只有两个:MessageDispatch15Interceptor和TcpFailureDetector。MessageDispatch15Interceptor只是在开了另外一个线程后,在一定条件下在该线程中进行sendMessage()方法调用。TcpFailureDetector的作用是处理sendMessage()方法出异常后对应的集群中的Member。最后面调用的ChannelCoordinator的sendMessage()方法:

    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {        if ( destination == null ) destination = membershipService.getMembers();        clusterSender.sendMessage(msg,destination);        if ( Logs.MESSAGES.isTraceEnabled() ) {            Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));        }    }
?

?其实调用的是ParallelNioSender的sendMessage()方法,最终调用的是NioSender类进行底层的处理。到这里刚刚启动的tomcat的请求已经完成,这个时候在集群的中的一台tomcat收到消息,进行相应处理,接收消息的类是NioReceiver

?

让我们转换思想,转换到接收方进行考虑:

?

NioReceiver在前面的博文中已经详细讲过,其实他就是个后台线程,在不断的跑。在有消息进来,接收到后,每个消息产生一个Runnable任务,然后交给线程池处理。看Runnable任务的NioReplicationTask的drainChannel()方法中,有一段代码:

 //process the message  ReceiverBase.messageDataReceived()                getCallback().messageDataReceived(msgs[i]);

?调用ListenCallback的方法,如下:

    public void messageDataReceived(ChannelMessage data) {        if ( this.listener != null ) {            if ( listener.accept(data) ) listener.messageReceived(data);        }    }
?

??? 这里的listener的实现类是:ChannelCoordinator,看代码是调用的父类的ChannelInterceptorBase的方法:

?

   public void messageReceived(ChannelMessage msg) {        if (getPrevious() != null) getPrevious().messageReceived(msg);    }
?

?? 这里的ChannelInterceptor还是前面说到的那两个,无关紧要,最终调用了GroupChannel的messageReceived()方法,具体的下一篇文章再说

?

?

?

?

热点排行