首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 企业软件 > 行业软件 >

HBASE 代码阅览笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)

2013-11-15 
HBASE 代码阅读笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)又回来了,还是看put,不过版本号变了,希

HBASE 代码阅读笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)
又回来了,还是看put,不过版本号变了,希望看0.94的童靴移驾到http://dennis-lee-gammy.iteye.com/admin/blogs/1972269
put和doput方法变化不大,唯一就是原来的缓存队列名字里面加了一个async,然后类型由ArrayList变成了LinkedList。

flushCommit方法



正常流程几乎完全找不到以前的影子!这里多出来一个处理类org.apache.hadoop.hbase.client.AsyncProcess,即ap成员。这个类是0.94版的代码里面完全没有的。难怪变化那么大。

首先这里有一个参数,指定为同步执行还是异步执行。从上面的doput方法和flushcommit方法可以看出,如果在doput的过程中,也就是调用htable.put(Put)的时候,如果缓存大小超过了客户端写缓存大小的限制,调用这个方法是异步的;而在flushcommit方法中,这个方法是同步的。这里也暴露出来一个与原有流程不同的地方,0.94中doput如果超过大小限制,是委托flushcommit方法提交的,而这里采用了一种更加柔和的方式。另外,那个htable的线程池成员在方法中也找不到它的影子了,以前可是带着到处跑的。


主流程差不多就完成了。重要的两个流程:请求和处理响应,应该是在
protected boolean canTakeOperation(HRegionLocation loc,                                     Map<String, Boolean> regionsIncluded,                                     Map<ServerName, Boolean> serversIncluded) {    String encodedRegionName = loc.getRegionInfo().getEncodedName();    Boolean regionPrevious = regionsIncluded.get(encodedRegionName);    //之前已经有这个region信息,则直接返回以保存的结果,这里有个问题,如果region信息有更新呢?估计在后面的代码里面。    if (regionPrevious != null) {      // We already know what to do with this region.      return regionPrevious;    }    //没有的话看看RS的信息,如果RS已经挂了,那么他对应的所有region都挂,不用看了,记录一下告诉上层吧    Boolean serverPrevious = serversIncluded.get(loc.getServerName());    if (Boolean.FALSE.equals(serverPrevious)) {      // It's a new region, on a region server that we have already excluded.      regionsIncluded.put(encodedRegionName, Boolean.FALSE);      return false;    }    AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);    if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {      // Too many tasks on this region already.hbase.client.max.perregion.tasks设置,默认为1哦,配置文件没有哦亲,每次只能运行一个任务?这个设置MS有点坑,后续看看改大了会不会有影响      regionsIncluded.put(encodedRegionName, Boolean.FALSE);      return false;    }        if (serverPrevious == null) {      // The region is ok, but we need to decide for this region server.      int newServers = 0; // number of servers we're going to contact so far      for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {        if (kv.getValue()) {          newServers++;        }      }      // Do we have too many total tasks already? 如果server的数量与等待完成的任务之和小于最大任务数(之前说过,默认100)      boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks;      if (ok) {        //在检查是否每个server能承受的最大任务数hbase.client.max.perserver.tasks=5,怎么都那么小呢,还不能在配置文件里面找到,坑死了啊        // If the total is fine, is it ok for this individual server?        AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());        ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);      }      // 如果检查失败,RS和Region都设置为false      if (!ok) {        regionsIncluded.put(encodedRegionName, Boolean.FALSE);        serversIncluded.put(loc.getServerName(), Boolean.FALSE);        return false;      }      serversIncluded.put(loc.getServerName(), Boolean.TRUE);    } else {      assert serverPrevious.equals(Boolean.TRUE);    }    regionsIncluded.put(encodedRegionName, Boolean.TRUE);    return true;  }


备注【3】,定位Resion,大操作,详见http://dennis-lee-gammy.iteye.com/admin/blogs/1973255

备注【5】,发送请求,大头,这里先放放

热点排行