首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

HBase Memstore flush代码翻阅笔记-2-由单个 memstore大小超过限制触发的 flush

2014-05-12 
HBase Memstore flush代码阅读笔记-2-由单个 memstore大小超过限制触发的 flush本代码基于0.96.1.1:http:/

HBase Memstore flush代码阅读笔记-2-由单个 memstore大小超过限制触发的 flush
本代码基于0.96.1.1:http://svn.apache.org/repos/asf/hbase/tags/0.96.1.1

    默认情况下,当某个 region 的 memstore 大小达到hbase.hregion.memstore.flush.size * hbase.hregion.memstore.block.multiplier时,会出发 memstore 的 flush 操作,并reject 客户端写请求。

OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)      throws IOException {    BatchOperationInProgress<Mutation> batchOp =      new BatchOperationInProgress<Mutation>(mutations);    boolean initialized = false;    while (!batchOp.isDone()) {      if (!isReplay) {        checkReadOnly();      }      checkResources();      long newSize;      if (isReplay) {//这里只是做一些简单的工作前置检查,不再码代码了        startRegionOperation(Operation.REPLAY_BATCH_MUTATE);      } else {        startRegionOperation(Operation.BATCH_MUTATE);      }      try {        if (!initialized) {          if (!isReplay) {            this.writeRequestsCount.increment();            doPreMutationHook(batchOp);//检查并调用前置coprocessor          }          initialized = true;        }        long addedSize = doMiniBatchMutation(batchOp, isReplay);//这里是一个长达300多行的方法。。逼人放弃的节奏啊        newSize = this.addAndGetGlobalMemstoreSize(addedSize);      } finally {        closeRegionOperation();      }      if (isFlushSize(newSize)) {        requestFlush();      }    }    return batchOp.retCodeDetails;  }  private void checkResources() throws RegionTooBusyException {    // If catalog region, do not impose resource constraints or block updates.    if (this.getRegionInfo().isMetaRegion()) return;    // 超过设定则请求 flush,并且以异常 reject 写操作。    if (this.memstoreSize.get() > this.blockingMemStoreSize) {      requestFlush();      throw new RegionTooBusyException("Above memstore limit, " +          "regionName=" + (this.getRegionInfo() == null ? "unknown" :          this.getRegionInfo().getRegionNameAsString()) +          ", server=" + (this.getRegionServerServices() == null ? "unknown" :          this.getRegionServerServices().getServerName()) +          ", memstoreSize=" + memstoreSize.get() +          ", blockingMemStoreSize=" + blockingMemStoreSize);    }  }


之所以说是默认情况,是因为建表时指定的flush 大小,优先级高于该设置
  long flushSize = this.htableDescriptor.getMemStoreFlushSize();    if (flushSize <= 0) {      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,        HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);    }    this.memstoreFlushSize = flushSize;    this.blockingMemStoreSize = this.memstoreFlushSize *        conf.getLong("hbase.hregion.memstore.block.multiplier", 2);

   其中,某个表的memory store flush size 可以通过在建表或改表如下语句实现:
   htableDescriptor.setMemStoreFlushSize(256 * 1024 * 1024);

   或者 hbase shell:
 
  create|alter 'test', {MEMSTORE_FLUSHSIZE => '268435456'},....  


接着看“请求 flush 操作”的方法。这里之所以说是请求,是因为该方法只是将region 加入到一个请求列表里面,并未真正的执行了flush,真正的 flush 请求是由其他的线程异步执行的。所以,过多的 flush 任务,会使新的 flush请求阻塞,最终导致整个 RS 都无法响应写请求。
 
 private void requestFlush() {    if (this.rsServices == null) {      return;    }    synchronized (writestate) {      if (this.writestate.isFlushRequested()) {        return;      }      writestate.flushRequested = true;    }    // Make request outside of synchronize block; HBASE-818.    this.rsServices.getFlushRequester().requestFlush(this);//如果当前 R 已经在请求队列中,则放弃请求,否则请求 flush    if (LOG.isDebugEnabled()) {      LOG.debug("Flush requested on " + this);    }  } public void requestFlush(HRegion r) {    synchronized (regionsInQueue) {      if (!regionsInQueue.containsKey(r)) {        // This entry has no delay so it will be added at the top of the flush        // queue.  It'll come out near immediately.        FlushRegionEntry fqe = new FlushRegionEntry(r);        this.regionsInQueue.put(r, fqe);        this.flushQueue.add(fqe);      }    }  }


通过jmap 命令查看,MemStoreFlusher每个 RS 只有一个实例,而 flush 操作的实际执行者为MemStoreFlusher的一个内部类FlushHandler。每个 MemStoreFlusher 默认启用1个 FlushHandler 实例。当然,这个实例可可配置的,通过hbase.hstore.flusher.count 指定.

热点排行