首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

curator-client源码翻阅笔记

2013-01-28 
curator-client源码阅读笔记Zookeeper官方client使用起来有很多不便,比如session expire之后需要使用一个

curator-client源码阅读笔记
Zookeeper官方client使用起来有很多不便,比如session expire之后需要使用一个新的ZooKeeper对象,提供的接口过于底层等等

Curator是对ZooKeeper的一个封装,其中curator-client是最底层的一个封装,主要是提供自动重连的功能


入口类 CuratorZookeeperClient本身是一个很简单的封装,只保存了retryPolicy和ensembleProvider,真正的连接管理都交给了ConnectionState来处理

    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)    {        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");        this.connectionTimeoutMs = connectionTimeoutMs;        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);        setRetryPolicy(retryPolicy);    }



ConnectionState
class ConnectionState implements Watcher, Closeable{    private volatile long connectionStartMs = 0;    private final Logger                        log = LoggerFactory.getLogger(getClass());    //负责管理Zookeeper连接    private final HandleHolder                  zooKeeper;    private final AtomicBoolean                 isConnected = new AtomicBoolean(false);        //zookeeper连接地址的provider    private final EnsembleProvider              ensembleProvider;    private final int                           connectionTimeoutMs;    private final AtomicReference<TracerDriver> tracer;    private final Queue<Exception>              backgroundExceptions = new ConcurrentLinkedQueue<Exception>();    用户自定义的watcher    private final Queue<Watcher>                parentWatchers = new ConcurrentLinkedQueue<Watcher>();        ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)    {        this.ensembleProvider = ensembleProvider;        this.connectionTimeoutMs = connectionTimeoutMs;        this.tracer = tracer;        if ( parentWatcher != null )        {            parentWatchers.offer(parentWatcher);        }                //ZooKeeper真正的连接还是由HandleHolder来管理,注意到第二个参数watcher,使用的是this        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);    }}


启动时调用CuratorZookeeperClient.start(),该方法会再调用ConnectionState.start()
    void        start() throws Exception    {        log.debug("Starting");        ensembleProvider.start();        reset();    }    private void reset() throws Exception    {        isConnected.set(false);        connectionStartMs = System.currentTimeMillis();        zooKeeper.closeAndReset();        zooKeeper.getZooKeeper();   // initiate connection    }

reset里调用了HandleHolder的closeAndRest()以及getZooKeeper()方法来进行初始化连接

class HandleHolder{    private final ZookeeperFactory zookeeperFactory;    private final Watcher watcher;    private final EnsembleProvider ensembleProvider;    private final int sessionTimeout;    private final boolean canBeReadOnly;    private volatile Helper helper;void closeAndReset() throws Exception    {        internalClose();        // first helper is synchronized when getZooKeeper is called. Subsequent calls        // are not synchronized.        helper = new Helper()        {            private volatile ZooKeeper zooKeeperHandle = null;            private volatile String connectionString = null;            @Override            public ZooKeeper getZooKeeper() throws Exception            {                //这个锁是加在Helper对象上                synchronized(this)                {                    if ( zooKeeperHandle == null )                    {                        connectionString = ensembleProvider.getConnectionString();                        //真正的创建Zookeeper对象                        zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);                    }                    //等待连接建立完成以后,替换掉helper实例以返回一个之前创建好的zooKeeperHandle                    helper = new Helper()                    {                        @Override                        public ZooKeeper getZooKeeper() throws Exception                        {                            return zooKeeperHandle;                        }                        @Override                        public String getConnectionString()                        {                            return connectionString;                        }                    };                    return zooKeeperHandle;                }            }            @Override            public String getConnectionString()            {                return connectionString;            }        };    }        ZooKeeper getZooKeeper() throws Exception    {        return helper.getZooKeeper();    }}

可以看到closeAndReset()调用是,创建了一个新的helper对象,但是此时zookeeper连接并没有创建出来
当调用getZooKeeper()时,在helper对象上加锁,并检查是否为null,以避免重复创建新的zookeeper对象
ZooKeeper对象创建完成以后,helper的引用会指向到一个新的匿名内部类对象,这个对象引用了之前创建的Zookeeper对象,这样当下次再调用getZooKeeper()时,就会直接返回

再回到ConnectionState上来,创建HandleHolder时,传入的Watcher是ConnectionState自己,我们来看看这段代码
    @Override    public void process(WatchedEvent event)    {        if ( LOG_EVENTS )        {            log.debug("ConnectState watcher: " + event);        }        for ( Watcher parentWatcher : parentWatchers )        {            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());            parentWatcher.process(event);            timeTrace.commit();        }        boolean wasConnected = isConnected.get();        boolean newIsConnected = wasConnected;        if ( event.getType() == Watcher.Event.EventType.None )        {            newIsConnected = checkState(event.getState(), wasConnected);        }        if ( newIsConnected != wasConnected )        {            isConnected.set(newIsConnected);            connectionStartMs = System.currentTimeMillis();        }    }    private boolean checkState(Event.KeeperState state, boolean wasConnected)    {        boolean     isConnected = wasConnected;        boolean     checkNewConnectionString = true;        switch ( state )        {            default:            case Disconnected:            {                isConnected = false;                break;            }            case SyncConnected:            case ConnectedReadOnly:            {                isConnected = true;                break;            }            case AuthFailed:            {                isConnected = false;                log.error("Authentication failed");                break;            }            case Expired:            {                isConnected = false;                checkNewConnectionString = false;                handleExpiredSession();                break;            }            case SaslAuthenticated:            {                // NOP                break;            }        }        if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )        {            handleNewConnectionString();        }        return isConnected;    }

这段代码先将WatchEvent发送给之前注册的parentWatcher处理,然后再检查KeeperState
在checkState里可以看到,对于Disconnected和SyncConnected,只是处理当前连接的标志位,当Session Expired之后,应该就是对Zookeeper连接重新替换
private void handleExpiredSession()    {        log.warn("Session expired event received");        tracer.get().addCount("session-expired", 1);        try        {            reset();        }        catch ( Exception e )        {            queueBackgroundException(e);        }    }


这里再次调用了reset()方法,再进入HandleHolder对象,关闭当前ZooKeeper,创建新的ZooKeeper并对外返回

最后是ConnectionState的getZooKeeper方法
ZooKeeper getZooKeeper() throws Exception    {        if ( SessionFailRetryLoop.sessionForThreadHasFailed() )        {            throw new SessionFailRetryLoop.SessionFailedException();        }        Exception exception = backgroundExceptions.poll();        if ( exception != null )        {            log.error("Background exception caught", exception);            tracer.get().addCount("background-exceptions", 1);            throw exception;        }        boolean localIsConnected = isConnected.get();        if ( !localIsConnected )        {            long        elapsed = System.currentTimeMillis() - connectionStartMs;            if ( elapsed >= connectionTimeoutMs )            {                if ( zooKeeper.hasNewConnectionString() )                {                    handleNewConnectionString();                }                else                {                    KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException();                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )                    {                        log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);                    }                    tracer.get().addCount("connections-timed-out", 1);                    throw connectionLossException;                }            }        }        return zooKeeper.getZooKeeper();    }

主要是检测当前的状态,如果Session超时或者连接中断,则抛出异常,否则返回HandlerHolder持有的连接

至于RetryPolicy是干嘛的,没有发现,猜测应该是在CuratorFramework里会使用到

总结
整个curator-client的核心代码就是这些了
curator-client把连接管理交给HandleHolder来处理,HandleHolder负责关闭已有连接并创建新连接,返回已创建的连接
而对于连接本身的管理是由ConnectionState在Watcher的回调里操作的,当Session Expired,让HandlerHolder重置并返回新连接

热点排行