首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

hbase源码分析(1):客户端数据入库

2012-07-08 
hbase源码分析(一):客户端数据入库?Hbase插入数据的过程大致是:客户端提交请求给region server(这中间会有

hbase源码分析(一):客户端数据入库

?

Hbase插入数据的过程大致是:

  1. 客户端提交请求给region server(这中间会有作一些缓存)
  2. region server接收到请求,判断如果是put请求,将其put到memstore每次memstore的操作,都会检查memstore是否操作一个阈值,如果超过,就开始执行flush(),这个flush其实就是从内存中的KeyValue对持久化到HStore(也就是HFile)上面

下面我们来看一条数据时怎么进入到hbase的吧:

客户端:

?

HTable.java 执行put操作
  public void put(final Put put) throws IOException {    doPut(Arrays.asList(put));  }
?在put方法里执行doPut操作

验证put的合法性,然后检查keyvalue的大小是否越界,这个值可以如过配置i参数hbase.client.keyvalue.maxsize参数来配置,默认这个值是无限大的,然后调用writeBuffer.add(put);将数据写入到本地缓存,当数据超过本地缓存writeBufferSize(默认是2097152)的大小或者设置了自动提交autoFlush (默认是打开的尾true)或者你手动调用了flushCommits()操作,这些缓存将被flush

  private void doPut(final List<Put> puts) throws IOException {    int n = 0;    for (Put put : puts) {      validatePut(put);      writeBuffer.add(put);//将数据写入到本地缓存      currentWriteBufferSize += put.heapSize();           // we need to periodically see if the writebuffer is full instead of waiting until the end of the List      n++;      if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) {        flushCommits();      }    }    if (autoFlush || currentWriteBufferSize > writeBufferSize) {      flushCommits();    }  }
?flushCommits()操作代码:
  public void flushCommits() throws IOException {    try {      Object[] results = new Object[writeBuffer.size()];      try {    //在这里连接远程的region server提交请求        this.connection.processBatch(writeBuffer, tableName, pool, results);      } catch (InterruptedException e) {        throw new IOException(e);      } finally {        // mutate list so that it is empty for complete success, or contains        // only failed records results are returned in the same order as the        // requests in list walk the list backwards, so we can remove from list        // without impacting the indexes of earlier members        for (int i = results.length - 1; i>=0; i--) {          if (results[i] instanceof Result) {            // successful Puts are removed from the list here.            writeBuffer.remove(i);          }        }      }    } finally {      if (clearBufferOnFail) {        writeBuffer.clear();        currentWriteBufferSize = 0;      } else {        // the write buffer was adjusted by processBatchOfPuts        currentWriteBufferSize = 0;        for (Put aPut : writeBuffer) {          currentWriteBufferSize += aPut.heapSize();        }      }    }  }

我们来看看HConnection.java的实现类HConnectionImplementation是怎么实现processBatch操作的:

?

    public void processBatch(List<? extends Row> list,        final byte[] tableName,        ExecutorService pool,        Object[] results) throws IOException, InterruptedException {      // results must be the same size as list      if (results.length != list.size()) {        throw new IllegalArgumentException("argument results must be the same size as argument list");      }      processBatchCallback(list, tableName, pool, results, null);    }    public <R> void processBatchCallback(        List<? extends Row> list,        byte[] tableName,        ExecutorService pool,        Object[] results,        Batch.Callback<R> callback)    throws IOException, InterruptedException {      // results must be the same size as list      if (results.length != list.size()) {        throw new IllegalArgumentException(            "argument results must be the same size as argument list");      }      if (list.isEmpty()) {        return;      }      // Keep track of the most recent servers for any given item for better      // exceptional reporting.  We keep HRegionLocation to save on parsing.      // Later below when we use lastServers, we'll pull what we need from      // lastServers.      HRegionLocation [] lastServers = new HRegionLocation[results.length];      List<Row> workingList = new ArrayList<Row>(list);      boolean retry = true;      // count that helps presize actions array      int actionCount = 0;      Throwable singleRowCause = null;      for (int tries = 0; tries < numRetries && retry; ++tries) {        // sleep first, if this is a retry        if (tries >= 1) {          long sleepTime = getPauseTime(tries);          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");          Thread.sleep(sleepTime);        }        // step 1: break up into regionserver-sized chunks and build the data structs        Map<HRegionLocation, MultiAction<R>> actionsByServer =          new HashMap<HRegionLocation, MultiAction<R>>();        for (int i = 0; i < workingList.size(); i++) {          Row row = workingList.get(i);          if (row != null) {            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);            byte[] regionName = loc.getRegionInfo().getRegionName();            MultiAction<R> actions = actionsByServer.get(loc);            if (actions == null) {              actions = new MultiAction<R>();              actionsByServer.put(loc, actions);            }            Action<R> action = new Action<R>(row, i);            lastServers[i] = loc;            actions.add(regionName, action);          }        }        // step 2: make the requests        Map<HRegionLocation, Future<MultiResponse>> futures =            new HashMap<HRegionLocation, Future<MultiResponse>>(                actionsByServer.size());        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));        }        // step 3: collect the failures and successes and prepare for retry        for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer             : futures.entrySet()) {          HRegionLocation loc = responsePerServer.getKey();          try {            Future<MultiResponse> future = responsePerServer.getValue();            MultiResponse resp = future.get();            if (resp == null) {              // Entire server failed              LOG.debug("Failed all for server: " + loc.getHostnamePort() +                ", removing from cache");              continue;            }            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {              byte[] regionName = e.getKey();              List<Pair<Integer, Object>> regionResults = e.getValue();              for (Pair<Integer, Object> regionResult : regionResults) {                if (regionResult == null) {                  // if the first/only record is 'null' the entire region failed.                  LOG.debug("Failures for region: " +                      Bytes.toStringBinary(regionName) +                      ", removing from cache");                } else {                  // Result might be an Exception, including DNRIOE                  results[regionResult.getFirst()] = regionResult.getSecond();                  if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {                    callback.update(e.getKey(),                        list.get(regionResult.getFirst()).getRow(),                        (R)regionResult.getSecond());                  }                }              }            }          } catch (ExecutionException e) {            LOG.warn("Failed all from " + loc, e);          }        }        // step 4: identify failures and prep for a retry (if applicable).        // Find failures (i.e. null Result), and add them to the workingList (in        // order), so they can be retried.        retry = false;        workingList.clear();        actionCount = 0;        for (int i = 0; i < results.length; i++) {          // if null (fail) or instanceof Throwable && not instanceof DNRIOE          // then retry that row. else dont.          if (results[i] == null ||              (results[i] instanceof Throwable &&                  !(results[i] instanceof DoNotRetryIOException))) {            retry = true;            actionCount++;            Row row = list.get(i);            workingList.add(row);            deleteCachedLocation(tableName, row.getRow());          } else {            if (results[i] != null && results[i] instanceof Throwable) {              actionCount++;            }            // add null to workingList, so the order remains consistent with the original list argument.            workingList.add(null);          }        }      }      if (retry) {        // Simple little check for 1 item failures.        if (singleRowCause != null) {          throw new IOException(singleRowCause);        }      }      List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);      List<Row> actions = new ArrayList<Row>(actionCount);      List<String> addresses = new ArrayList<String>(actionCount);      for (int i = 0 ; i < results.length; i++) {        if (results[i] == null || results[i] instanceof Throwable) {          exceptions.add((Throwable)results[i]);          actions.add(list.get(i));          addresses.add(lastServers[i].getHostnamePort());        }      }      if (!exceptions.isEmpty()) {        throw new RetriesExhaustedWithDetailsException(exceptions,            actions,            addresses);      }    }?

?

通过RPC向Region Server提交数据,

?

    private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,        final MultiAction<R> multi, final byte [] tableName) {      final HConnection connection = this;      return new Callable<MultiResponse>() {       public MultiResponse call() throws IOException {         return getRegionServerWithoutRetries(             new ServerCallable<MultiResponse>(connection, tableName, null) {               public MultiResponse call() throws IOException {                 return server.multi(multi);               }               @Override               public void connect(boolean reload) throws IOException {                 server =                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());               }             }         );       }     };   }
?

?

获取RPC实例的操作:

?

 HRegionInterface getHRegionConnection(final String hostname, final int port,        final InetSocketAddress isa, final boolean master)    throws IOException {      if (master) getMaster();      HRegionInterface server;      String rsName = null;      if (isa != null) {        rsName = Addressing.createHostAndPortStr(isa.getHostName(),            isa.getPort());      } else {        rsName = Addressing.createHostAndPortStr(hostname, port);      }      // See if we already have a connection (common case)      server = this.servers.get(rsName);      if (server == null) {        // create a unique lock for this RS (if necessary)        this.connectionLock.putIfAbsent(rsName, rsName);        // get the RS lock        synchronized (this.connectionLock.get(rsName)) {          // do one more lookup in case we were stalled above          server = this.servers.get(rsName);          if (server == null) {            try {              if (clusterId.hasId()) {                conf.set(HConstants.CLUSTER_ID, clusterId.getId());              }              // Only create isa when we need to.              InetSocketAddress address = isa != null? isa:                new InetSocketAddress(hostname, port);              // definitely a cache miss. establish an RPC for this RS              server = (HRegionInterface) HBaseRPC.waitForProxy(                  serverInterfaceClass, HRegionInterface.VERSION,                  address, this.conf,                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);              this.servers.put(Addressing.createHostAndPortStr(                  address.getHostName(), address.getPort()), server);            } catch (RemoteException e) {              LOG.warn("RemoteException connecting to RS", e);              // Throw what the RemoteException was carrying.              throw e.unwrapRemoteException();            }          }        }      }      return server;    }

?

hbase client在执行插入的时候,会对最近使用的region server做缓存,如果缓存中保存了相应的region server信息,就直接使用这个region信息,连接这个region server,否则会对master进行一次rpc操作,获得region server信息,客户端的操作put、get、delete等操作每次都是封装在一个Action对象中进行提交操作的,都是一系列的的action一起提交,这就是MultiActionServer端操作:
客户端通过RPC提交过来的操作会进入到HRegionServer.multi(MultiAction<R> multi)中处理插入请求。出去每一个action对象,判断属于哪一个实例(put/get/delete),来执行相应的操作给每个put分配一个lock执行HRgion.put,进行数据写入操作HRegionServer.java
  @SuppressWarnings("unchecked")  @Override  public <R> MultiResponse multi(MultiAction<R> multi) throws IOException {    checkOpen();    MultiResponse response = new MultiResponse();    for (Map.Entry<byte[], List<Action<R>>> e : multi.actions.entrySet()) {      byte[] regionName = e.getKey();      List<Action<R>> actionsForRegion = e.getValue();      // sort based on the row id - this helps in the case where we reach the      // end of a region, so that we don't have to try the rest of the      // actions in the list.      Collections.sort(actionsForRegion);      Row action;      List<Action<R>> puts = new ArrayList<Action<R>>();      for (Action<R> a : actionsForRegion) {        action = a.getAction();        int originalIndex = a.getOriginalIndex();        try {          //判断action是哪种操作          if (action instanceof Delete) {            delete(regionName, (Delete) action);            response.add(regionName, originalIndex, new Result());          } else if (action instanceof Get) {            response.add(regionName, originalIndex, get(regionName, (Get) action));          } else if (action instanceof Put) {            puts.add(a);  // wont throw.          } else if (action instanceof Exec) {            ExecResult result = execCoprocessor(regionName, (Exec)action);            response.add(regionName, new Pair<Integer, Object>(                a.getOriginalIndex(), result.getValue()            ));          } else {            LOG.debug("Error: invalid Action, row must be a Get, Delete, " +                "Put or Exec.");            throw new DoNotRetryIOException("Invalid Action, row must be a " +                "Get, Delete or Put.");          }        } catch (IOException ex) {          response.add(regionName, originalIndex, ex);        }      }      // We do the puts with result.put so we can get the batching efficiency      // we so need. All this data munging doesn't seem great, but at least      // we arent copying bytes or anything.      if (!puts.isEmpty()) {        try {          HRegion region = getRegion(regionName);          if (!region.getRegionInfo().isMetaTable()) {            this.cacheFlusher.reclaimMemStoreMemory();          }          List<Pair<Put,Integer>> putsWithLocks =              Lists.newArrayListWithCapacity(puts.size());          for (Action<R> a : puts) {            Put p = (Put) a.getAction();            Integer lock;            try {              //获取lock              lock = getLockFromId(p.getLockId());            } catch (UnknownRowLockException ex) {              response.add(regionName, a.getOriginalIndex(), ex);              continue;            }            putsWithLocks.add(new Pair<Put, Integer>(p, lock));          }          this.requestCount.addAndGet(puts.size());          //调用将数据写入到region中          OperationStatus[] codes =              region.put(putsWithLocks.toArray(new Pair[]{}));          for( int i = 0 ; i < codes.length ; i++) {            OperationStatus code = codes[i];            Action<R> theAction = puts.get(i);            Object result = null;            if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {              result = new Result();            } else if (code.getOperationStatusCode()                == OperationStatusCode.BAD_FAMILY) {              result = new NoSuchColumnFamilyException(code.getExceptionMsg());            }            // FAILURE && NOT_RUN becomes null, aka: need to run again.            response.add(regionName, theAction.getOriginalIndex(), result);          }        } catch (IOException ioe) {          // fail all the puts with the ioe in question.          for (Action<R> a: puts) {            response.add(regionName, a.getOriginalIndex(), ioe);          }        }      }    }    return response;  }
?HRegion.java的put操作:
  /**   * @param put   * @param lockid   * @param writeToWAL   * @throws IOException   */  public void put(Put put, Integer lockid, boolean writeToWAL)  throws IOException {//检查region是否只读,如果只读,就会抛出异常    checkReadOnly();    // Do a rough check that we have resources to accept a write.  The check is    // 'rough' in that between the resource check and the call to obtain a    // read lock, resources may run out.  For now, the thought is that this    // will be extremely rare; we'll deal with it when it happens.    checkResources();    //获取的lock    startRegionOperation();    this.writeRequestsCount.increment();    try {      // We obtain a per-row lock, so other clients will block while one client      // performs an update. The read lock is released by the client calling      // #commit or #abort or if the HRegionServer lease on the lock expires.      // See HRegionServer#RegionListener for how the expire on HRegionServer      // invokes a HRegion#abort.      byte [] row = put.getRow();      // If we did not pass an existing row lock, obtain a new one      Integer lid = getLock(lockid, row, true);      try {        // All edits for the given row (across all column families) must happen atomically.        internalPut(put, put.getClusterId(), writeToWAL);      } finally {        if(lockid == null) releaseRowLock(lid);      }    } finally {      closeRegionOperation();    }  }
?

checkResource()操作:

在实际执行put执行,先要进行必要的检查操作,我们来看看checkResource()方法。

?

  private void checkResources() {    // If catalog region, do not impose resource constraints or block updates.    if (this.getRegionInfo().isMetaRegion()) return;    boolean blocked = false;    while (this.memstoreSize.get() > this.blockingMemStoreSize) {      requestFlush();      if (!blocked) {        LOG.info("Blocking updates for '" + Thread.currentThread().getName() +          "' on region " + Bytes.toStringBinary(getRegionName()) +          ": memstore size " +          StringUtils.humanReadableInt(this.memstoreSize.get()) +          " is >= than blocking " +          StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");      }      blocked = true;      synchronized(this) {        try {          wait(threadWakeFrequency);        } catch (InterruptedException e) {          // continue;        }      }    }    if (blocked) {      LOG.info("Unblocking updates for region " + this + " '"          + Thread.currentThread().getName() + "'");    }  }

?可以看出当Hregion的Memstore总大小超过blockingMemStoreSize,则会进入flush操作,线程会进入到阻塞状态,直到memstoresize的值降到合适的范围内。

?

?

internalPut这个操作包括:

?

checkFamilies 检查列族updateKVTimestamps 更新KeyValue的时间戳addFamilyMapToWALEdit 预写日志applyFamilyMapToMemstore 将数据写入到memstore中isFlushSize 判断是否将文件flush到HFile中释放锁将memstore的数据flush到HFile中

本文仅是个人理解,有什么不正确的地方肯定指正

?

热点排行