curator-client源码阅读笔记
Zookeeper官方client使用起来有很多不便,比如session expire之后需要使用一个新的ZooKeeper对象,提供的接口过于底层等等
Curator是对ZooKeeper的一个封装,其中curator-client是最底层的一个封装,主要是提供自动重连的功能
入口类 CuratorZookeeperClient本身是一个很简单的封装,只保存了retryPolicy和ensembleProvider,真正的连接管理都交给了ConnectionState来处理
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly) { retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null"); ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null"); this.connectionTimeoutMs = connectionTimeoutMs; state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly); setRetryPolicy(retryPolicy); }class ConnectionState implements Watcher, Closeable{ private volatile long connectionStartMs = 0; private final Logger log = LoggerFactory.getLogger(getClass()); //负责管理Zookeeper连接 private final HandleHolder zooKeeper; private final AtomicBoolean isConnected = new AtomicBoolean(false); //zookeeper连接地址的provider private final EnsembleProvider ensembleProvider; private final int connectionTimeoutMs; private final AtomicReference<TracerDriver> tracer; private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>(); 用户自定义的watcher private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>(); ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly) { this.ensembleProvider = ensembleProvider; this.connectionTimeoutMs = connectionTimeoutMs; this.tracer = tracer; if ( parentWatcher != null ) { parentWatchers.offer(parentWatcher); } //ZooKeeper真正的连接还是由HandleHolder来管理,注意到第二个参数watcher,使用的是this zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly); }} void start() throws Exception { log.debug("Starting"); ensembleProvider.start(); reset(); } private void reset() throws Exception { isConnected.set(false); connectionStartMs = System.currentTimeMillis(); zooKeeper.closeAndReset(); zooKeeper.getZooKeeper(); // initiate connection }class HandleHolder{ private final ZookeeperFactory zookeeperFactory; private final Watcher watcher; private final EnsembleProvider ensembleProvider; private final int sessionTimeout; private final boolean canBeReadOnly; private volatile Helper helper;void closeAndReset() throws Exception { internalClose(); // first helper is synchronized when getZooKeeper is called. Subsequent calls // are not synchronized. helper = new Helper() { private volatile ZooKeeper zooKeeperHandle = null; private volatile String connectionString = null; @Override public ZooKeeper getZooKeeper() throws Exception { //这个锁是加在Helper对象上 synchronized(this) { if ( zooKeeperHandle == null ) { connectionString = ensembleProvider.getConnectionString(); //真正的创建Zookeeper对象 zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly); } //等待连接建立完成以后,替换掉helper实例以返回一个之前创建好的zooKeeperHandle helper = new Helper() { @Override public ZooKeeper getZooKeeper() throws Exception { return zooKeeperHandle; } @Override public String getConnectionString() { return connectionString; } }; return zooKeeperHandle; } } @Override public String getConnectionString() { return connectionString; } }; } ZooKeeper getZooKeeper() throws Exception { return helper.getZooKeeper(); }} @Override public void process(WatchedEvent event) { if ( LOG_EVENTS ) { log.debug("ConnectState watcher: " + event); } for ( Watcher parentWatcher : parentWatchers ) { TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get()); parentWatcher.process(event); timeTrace.commit(); } boolean wasConnected = isConnected.get(); boolean newIsConnected = wasConnected; if ( event.getType() == Watcher.Event.EventType.None ) { newIsConnected = checkState(event.getState(), wasConnected); } if ( newIsConnected != wasConnected ) { isConnected.set(newIsConnected); connectionStartMs = System.currentTimeMillis(); } } private boolean checkState(Event.KeeperState state, boolean wasConnected) { boolean isConnected = wasConnected; boolean checkNewConnectionString = true; switch ( state ) { default: case Disconnected: { isConnected = false; break; } case SyncConnected: case ConnectedReadOnly: { isConnected = true; break; } case AuthFailed: { isConnected = false; log.error("Authentication failed"); break; } case Expired: { isConnected = false; checkNewConnectionString = false; handleExpiredSession(); break; } case SaslAuthenticated: { // NOP break; } } if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() ) { handleNewConnectionString(); } return isConnected; }private void handleExpiredSession() { log.warn("Session expired event received"); tracer.get().addCount("session-expired", 1); try { reset(); } catch ( Exception e ) { queueBackgroundException(e); } }ZooKeeper getZooKeeper() throws Exception { if ( SessionFailRetryLoop.sessionForThreadHasFailed() ) { throw new SessionFailRetryLoop.SessionFailedException(); } Exception exception = backgroundExceptions.poll(); if ( exception != null ) { log.error("Background exception caught", exception); tracer.get().addCount("background-exceptions", 1); throw exception; } boolean localIsConnected = isConnected.get(); if ( !localIsConnected ) { long elapsed = System.currentTimeMillis() - connectionStartMs; if ( elapsed >= connectionTimeoutMs ) { if ( zooKeeper.hasNewConnectionString() ) { handleNewConnectionString(); } else { KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException(); if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException); } tracer.get().addCount("connections-timed-out", 1); throw connectionLossException; } } } return zooKeeper.getZooKeeper(); }