首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

深入显出Zookeeper之七分布式CREATE事务处理

2013-02-15 
深入浅出Zookeeper之七分布式CREATE事务处理前面几篇文章讲了follower和leader之间如何选举和初始化的,这

深入浅出Zookeeper之七分布式CREATE事务处理

前面几篇文章讲了follower和leader之间如何选举和初始化的,这一篇将以之前描述过的CREATE请求作为例子来描述在集群环境下是如何处理事务的。

关于client和zookeeper server的描述前几篇文章已经涉及了。这里不就不再赘述了。假设client和某一个follower建立了连接,并发送了CREATE请求。在follower端,IO线程拿到请求开始执行处理链,Follower处理链如下

深入显出Zookeeper之七分布式CREATE事务处理

初始化代码:

    protected void setupRequestProcessors() {        RequestProcessor finalProcessor = new FinalRequestProcessor(this);        commitProcessor = new CommitProcessor(finalProcessor,                Long.toString(getServerId()), true);        commitProcessor.start();        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);        ((FollowerRequestProcessor) firstProcessor).start();        syncProcessor = new SyncRequestProcessor(this,                new SendAckRequestProcessor((Learner)getFollower()));        syncProcessor.start();    }

?第一个处理器是FollowerRequestProcessor,处理如下

while (!finished) {                Request request = queuedRequests.take();                if (LOG.isTraceEnabled()) {                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,                            'F', request, "");                }                if (request == Request.requestOfDeath) {                    break;                }                // We want to queue the request to be processed before we submit                // the request to the leader so that we are ready to receive                // the response//先交给CommitProcessor,最终投票通过后,会通过CommitProcessor的commit方法最终提交事务                nextProcessor.processRequest(request);                                // We now ship the request to the leader. As with all                // other quorum operations, sync also follows this code                // path, but different from others, we need to keep track                // of the sync operations this follower has pending, so we                // add it to pendingSyncs.//只有事务请求才转发给leader,进行投票                switch (request.type) {                case OpCode.sync:                    zks.pendingSyncs.add(request);                    zks.getFollower().request(request);                    break;                case OpCode.create:                case OpCode.delete:                case OpCode.setData:                case OpCode.setACL:                case OpCode.createSession:                case OpCode.closeSession:                case OpCode.multi:                    zks.getFollower().request(request);                    break;                }

?转发事务请求给leader

   void request(Request request) throws IOException {//反序列化        ByteArrayOutputStream baos = new ByteArrayOutputStream();        DataOutputStream oa = new DataOutputStream(baos);        oa.writeLong(request.sessionId);        oa.writeInt(request.cxid);        oa.writeInt(request.type);        if (request.request != null) {            request.request.rewind();            int len = request.request.remaining();            byte b[] = new byte[len];            request.request.get(b);            request.request.rewind();            oa.write(b);        }        oa.close();        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos                .toByteArray(), request.authInfo);        writePacket(qp, true);    }

?在CommitProcessor中主要是等待缓存请求,并等待该请求被commit

 while (!finished) {                int len = toProcess.size();//最终的请求处理交给FinalRequestProcessor                for (int i = 0; i < len; i++) {                    nextProcessor.processRequest(toProcess.get(i));                }                toProcess.clear();                synchronized (this) {//如果没有commit请求,则wait,直到commit请求的时候唤醒                    if ((queuedRequests.size() == 0 || nextPending != null)                            && committedRequests.size() == 0) {                        wait();                        continue;                    }                    // First check and see if the commit came in for the pending                    // request    //有commit请求,则添加到最终队列,下一轮处理                    if ((queuedRequests.size() == 0 || nextPending != null)                            && committedRequests.size() > 0) {                        Request r = committedRequests.remove();                        /*                         * We match with nextPending so that we can move to the                         * next request when it is committed. We also want to                         * use nextPending because it has the cnxn member set                         * properly.                         */ //如果是自己的请求,则使用之前的Request,以为之前的Request带client的连接信息,可以写回响应                        if (nextPending != null                                && nextPending.sessionId == r.sessionId                                && nextPending.cxid == r.cxid) {                            // we want to send our version of the request.                            // the pointer to the connection in the request                            nextPending.hdr = r.hdr;                            nextPending.txn = r.txn;                            nextPending.zxid = r.zxid;                            toProcess.add(nextPending);                            nextPending = null;                        }//如果是别人的请求,则使用新的Request,不带连接信息,无法发送响应else {                            // this request came from someone else so just                            // send the commit packet                            toProcess.add(r);                        }                    }                }                // We haven't matched the pending requests, so go back to                // waiting//有pending请求,但是该请求还未commit,则继续                if (nextPending != null) {                    continue;                }//从队列中拿待处理请求                synchronized (this) {                    // Process the next requests in the queuedRequests                    while (nextPending == null && queuedRequests.size() > 0) {                        Request request = queuedRequests.remove();                        switch (request.type) {                        case OpCode.create:                        case OpCode.delete:                        case OpCode.setData:                        case OpCode.multi:                        case OpCode.setACL:                        case OpCode.createSession:                        case OpCode.closeSession:                            nextPending = request;                            break;                        case OpCode.sync:                            if (matchSyncs) {                                nextPending = request;                            } else {                                toProcess.add(request);                            }                            break;                        default:                            toProcess.add(request);                        }                    }                }

?在这个场景中,CREATE请求先到了queuedRequests中,然后nextPending会指向这个请求,但是此时还未commit,所以CommitProcessor会wait,直到该请求投票被通过,然后被commit。

此时leader收到了转发的请求,在LearnerHandler中

case Leader.REQUEST:                    //反序列化                    bb = ByteBuffer.wrap(qp.getData());                    sessionId = bb.getLong();                    cxid = bb.getInt();                    type = bb.getInt();                    bb = bb.slice();                    Request si;                    if(type == OpCode.sync){                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());                    } else {                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());                    }                    si.setOwner(this);    //提交给执行链处理                    leader.zk.submitRequest(si);                    break;

?Leader端的执行链如下

深入显出Zookeeper之七分布式CREATE事务处理

PrepRequestProcessor在之前的文章已经分析过了,主要是根据请求类型,拼装不同的Request,这里是CreateRequest

接下来ProposalRequestProcessor执行,ProposalRequestProcessor主要是发起投票

public void processRequest(Request request) throws RequestProcessorException {......                                /* In the following IF-THEN-ELSE block, we process syncs on the leader.          * If the sync is coming from a follower, then the follower         * handler adds it to syncHandler. Otherwise, if it is a client of         * the leader that issued the sync command, then syncHandler won't          * contain the handler. In this case, we add it to syncHandler, and          * call processRequest on the next processor.         */                if(request instanceof LearnerSyncRequest){            zks.getLeader().processSync((LearnerSyncRequest)request);        } else {//先交给CommitProcessor处理下,此时还未提交                nextProcessor.processRequest(request);            if (request.hdr != null) {                // We need to sync and get consensus on any transactions                try {//发起一个投票                    zks.getLeader().propose(request);                } catch (XidRolloverException e) {                    throw new RequestProcessorException(e.getMessage(), e);                }//先写日志                syncProcessor.processRequest(request);            }        }    }

?leader发起投票

 public Proposal propose(Request request) throws XidRolloverException {        .......        ByteArrayOutputStream baos = new ByteArrayOutputStream();        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);        try {            request.hdr.serialize(boa, "hdr");            if (request.txn != null) {                request.txn.serialize(boa, "txn");            }            baos.close();        } catch (IOException e) {            LOG.warn("This really should be impossible", e);        }//投票包        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,                 baos.toByteArray(), null);                Proposal p = new Proposal();        p.packet = pp;        p.request = request;        synchronized (this) {            if (LOG.isDebugEnabled()) {                LOG.debug("Proposing:: " + request);            }            lastProposed = p.packet.getZxid();    //添加到投票箱,后续leader收到选票时会检查这个投票箱里的投票是否满足条件            outstandingProposals.put(lastProposed, p);    //给每个follower发一个投票包,让他们投票            sendPacket(pp);        }        return p;    }

?leader发完投票后,通过SyncRequestProcessor将事务写入日志文件,本地写成功后,投票成功。

SyncRequestProcessor之前文章已经分析过了,主要是将事务顺序写入日志文件。主要看之后的AckRequestProcessor

    public void processRequest(Request request) {        QuorumPeer self = leader.self;        if(self != null)//本地日志写成功后,认为自己成功了            leader.processAck(self.getId(), request.zxid, null);        else            LOG.error("Null QuorumPeer");    }

?leader的processAck方法比较关键,之前也有分析,这里再强调下

   synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {       .......       //当有选票进来时,先看看是哪个投票的        Proposal p = outstandingProposals.get(zxid);        if (p == null) {            LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",                    Long.toHexString(zxid), followerAddr);            return;        }        //把票投上        p.ackSet.add(sid);        if (LOG.isDebugEnabled()) {            LOG.debug("Count for zxid: 0x{} is {}",                    Long.toHexString(zxid), p.ackSet.size());        }//如果满足投票结束条件,默认是半数server统一,则提交事务        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){                         if (zxid != lastCommitted+1) {                LOG.warn("Commiting zxid 0x{} from {} not first!",                        Long.toHexString(zxid), followerAddr);                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));            }            outstandingProposals.remove(zxid);            if (p.request != null) {//先添加到带提交队列                toBeApplied.add(p);            }            // We don't commit the new leader proposal            if ((zxid & 0xffffffffL) != 0) {                if (p.request == null) {                    LOG.warn("Going to commmit null request for proposal: {}", p);                }//事务提交,通知follower提交事务                commit(zxid);//通知Observer                inform(p);//leader commit事务zk.commitProcessor.commit(p.request);            ......        }    }

?通知follower提交事务

    public void commit(long zxid) {        synchronized(this){            lastCommitted = zxid;        }//发送COMMIT包        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);        sendPacket(qp);    }

?此时Follower收到proposal包,follower中处理投票

case Leader.PROPOSAL:                        TxnHeader hdr = new TxnHeader();            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);            if (hdr.getZxid() != lastQueued + 1) {                LOG.warn("Got zxid 0x"                        + Long.toHexString(hdr.getZxid())                        + " expected 0x"                        + Long.toHexString(lastQueued + 1));            }            lastQueued = hdr.getZxid();    //记录事务日志,成功后发送ACK包            fzk.logRequest(hdr, txn);            break;

?

    public void logRequest(TxnHeader hdr, Record txn) {        Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),                hdr.getType(), null, null);        request.hdr = hdr;        request.txn = txn;        request.zxid = hdr.getZxid();        if ((request.zxid & 0xffffffffL) != 0) {            pendingTxns.add(request);        }//还是通过SyncRequestProcessor将事务写入本地文件,再发送ack包        syncProcessor.processRequest(request);    }

?日志写成功后,SendAckRequestProcessor发送ACK包

    public void processRequest(Request si) {        if(si.type != OpCode.sync){//ACK包            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,                null);            try {//发送                learner.writePacket(qp, false);            } catch (IOException e) {                LOG.warn("Closing connection to leader, exception during packet send", e);                try {                    if (!learner.sock.isClosed()) {                        learner.sock.close();                    }                } catch (IOException e1) {                    // Nothing to do, we are shutting things down, so an exception here is irrelevant                    LOG.debug("Ignoring error closing the connection", e1);                }            }        }    }

?此时,leader收到ack包,LearnerHandler线程中

   case Leader.ACK:                    if (this.learnerType == LearnerType.OBSERVER) {                        if (LOG.isDebugEnabled()) {                            LOG.debug("Received ACK from Observer  " + this.sid);                        }                    }                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());                    break;

?还是调用了processAck方法,由于之前已经有了leader自己的投票,此时follower再投一票,3台机器的集群即认为投票成功,leader开始发送commit操作,也就是发送commit包给follower。

follower收到commit包

case Leader.COMMIT:            fzk.commit(qp.getZxid());            break;    public void commit(long zxid) {        if (pendingTxns.size() == 0) {            LOG.warn("Committing " + Long.toHexString(zxid)                    + " without seeing txn");            return;        }        long firstElementZxid = pendingTxns.element().zxid;        if (firstElementZxid != zxid) {            LOG.error("Committing zxid 0x" + Long.toHexString(zxid)                    + " but next pending txn 0x"                    + Long.toHexString(firstElementZxid));            System.exit(12);        }//从Pending队列中拿到待commit请求        Request request = pendingTxns.remove();//commit这个请求,这个请求将交给FinalRequestProcessor处理        commitProcessor.commit(request);    }

?Commit之后请求将交给FinalRequestProcessor处理,修改最后的内存db结构,如果是本机请求则写回响应,如果不是则不用写回响应

热点排行