HBASE 代码阅读笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)
又回来了,还是看put,不过版本号变了,希望看0.94的童靴移驾到http://dennis-lee-gammy.iteye.com/admin/blogs/1972269
put和doput方法变化不大,唯一就是原来的缓存队列名字里面加了一个async,然后类型由ArrayList变成了LinkedList。
flushCommit方法
正常流程几乎完全找不到以前的影子!这里多出来一个处理类org.apache.hadoop.hbase.client.AsyncProcess,即ap成员。这个类是0.94版的代码里面完全没有的。难怪变化那么大。
首先这里有一个参数,指定为同步执行还是异步执行。从上面的doput方法和flushcommit方法可以看出,如果在doput的过程中,也就是调用htable.put(Put)的时候,如果缓存大小超过了客户端写缓存大小的限制,调用这个方法是异步的;而在flushcommit方法中,这个方法是同步的。这里也暴露出来一个与原有流程不同的地方,0.94中doput如果超过大小限制,是委托flushcommit方法提交的,而这里采用了一种更加柔和的方式。另外,那个htable的线程池成员在方法中也找不到它的影子了,以前可是带着到处跑的。
主流程差不多就完成了。重要的两个流程:请求和处理响应,应该是在protected boolean canTakeOperation(HRegionLocation loc, Map<String, Boolean> regionsIncluded, Map<ServerName, Boolean> serversIncluded) { String encodedRegionName = loc.getRegionInfo().getEncodedName(); Boolean regionPrevious = regionsIncluded.get(encodedRegionName); //之前已经有这个region信息,则直接返回以保存的结果,这里有个问题,如果region信息有更新呢?估计在后面的代码里面。 if (regionPrevious != null) { // We already know what to do with this region. return regionPrevious; } //没有的话看看RS的信息,如果RS已经挂了,那么他对应的所有region都挂,不用看了,记录一下告诉上层吧 Boolean serverPrevious = serversIncluded.get(loc.getServerName()); if (Boolean.FALSE.equals(serverPrevious)) { // It's a new region, on a region server that we have already excluded. regionsIncluded.put(encodedRegionName, Boolean.FALSE); return false; } AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName); if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { // Too many tasks on this region already.hbase.client.max.perregion.tasks设置,默认为1哦,配置文件没有哦亲,每次只能运行一个任务?这个设置MS有点坑,后续看看改大了会不会有影响 regionsIncluded.put(encodedRegionName, Boolean.FALSE); return false; } if (serverPrevious == null) { // The region is ok, but we need to decide for this region server. int newServers = 0; // number of servers we're going to contact so far for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) { if (kv.getValue()) { newServers++; } } // Do we have too many total tasks already? 如果server的数量与等待完成的任务之和小于最大任务数(之前说过,默认100) boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks; if (ok) { //在检查是否每个server能承受的最大任务数hbase.client.max.perserver.tasks=5,怎么都那么小呢,还不能在配置文件里面找到,坑死了啊 // If the total is fine, is it ok for this individual server? AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); } // 如果检查失败,RS和Region都设置为false if (!ok) { regionsIncluded.put(encodedRegionName, Boolean.FALSE); serversIncluded.put(loc.getServerName(), Boolean.FALSE); return false; } serversIncluded.put(loc.getServerName(), Boolean.TRUE); } else { assert serverPrevious.equals(Boolean.TRUE); } regionsIncluded.put(encodedRegionName, Boolean.TRUE); return true; }
备注【3】,定位Resion,大操作,详见http://dennis-lee-gammy.iteye.com/admin/blogs/1973255
备注【5】,发送请求,大头,这里先放放