HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)
最近闲来无事看看hbase源代码,为了加强理解和记忆随便写了一些东西在这里。初次整理,内容和思路都比较凌乱。(本篇已完结)
本代码基于hbase 0.94.12
HTable
@Override public void put(final List<Put> puts) throws IOException { for (Put put : puts) { doPut(put); } if (autoFlush) { flushCommits(); } } private void doPut(Put put) throws IOException { validatePut(put); writeBuffer.add(put); currentWriteBufferSize += put.heapSize(); if (currentWriteBufferSize > writeBufferSize) { flushCommits(); } }
@Override public void flushCommits() throws IOException { try { Object[] results = new Object[writeBuffer.size()]; try { this.connection.processBatch(writeBuffer, tableName, pool, results);//这里是关键的操作 } catch (InterruptedException e) { throw new IOException(e); } finally { // mutate list so that it is empty for complete success, or contains // only failed records results are returned in the same order as the // requests in list walk the list backwards, so we can remove from list // without impacting the indexes of earlier members // 删除已经成功返回的操作 for (int i = results.length - 1; i >= 0; i--) { if (results[i] instanceof Result) { // successful Puts are removed from the list here. writeBuffer.remove(i); } } } } finally { if (clearBufferOnFail) {//如果设置为clearBufferOnFail模式,则不管成功与否都清除操作队列和缓存大小,该值默认=autoFlush writeBuffer.clear(); currentWriteBufferSize = 0; } else { // the write buffer was adjusted by processBatchOfPuts currentWriteBufferSize = 0; for (Put aPut : writeBuffer) { currentWriteBufferSize += aPut.heapSize(); }// 重新计算一遍缓存大小 } } } public void setAutoFlush(boolean autoFlush) { setAutoFlush(autoFlush, autoFlush); } public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { this.autoFlush = autoFlush; this.clearBufferOnFail = autoFlush || clearBufferOnFail; }
public void processBatch(List<? extends Row> list, final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { // This belongs in HTable!!! Not in here. St.Ack // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException("argument results must be the same size as argument list"); } processBatchCallback(list, tableName, pool, results, null); }
public <R> void processBatchCallback( List<? extends Row> list,//put操作列表 byte[] tableName,//tableName ExecutorService pool,//HTable的executorService成员 Object[] results,//返回结果 Batch.Callback<R> callback)//回调,目测现在为NULL throws IOException, InterruptedException { // This belongs in HTable!!! Not in here. St.Ack // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException( "argument results must be the same size as argument list"); } if (list.isEmpty()) { return; } // Keep track of the most recent servers for any given item for better // exceptional reporting. We keep HRegionLocation to save on parsing. // Later below when we use lastServers, we'll pull what we need from // lastServers. // 记录下最近访问的服务器的信息以便报告异常。 // 保存HRegionLocation来节省解析的成本。 // 然后当我们要用到最新访问的服务器的时候, // 可以直接从lastservers里面获取想要的信息 HRegionLocation[] lastServers = new HRegionLocation[results.length]; List<Row> workingList = new ArrayList<Row>(list); boolean retry = true; // count that helps presize actions array int actionCount = 0; // numRetries,失败重试次数, // 由hbase.client.retries.number配置决定,默认为10 for (int tries = 0; tries < numRetries && retry; ++tries) { // sleep first, if this is a retry if (tries >= 1) { long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries); LOG.debug("Retry " + tries + ", sleep for " + sleepTime + "ms!"); Thread.sleep(sleepTime); } // 以下方法会很长,每一个步骤都单独提取出来阅读吧 // step 1: break up into regionserver-sized chunks and build the data structs // step 2: make the requests // step 3: collect the failures and successes and prepare for retry // step 4: identify failures and prep for a retry (if applicable). } // 成功或重试完成后,检查是否还有失败的任务 List<Throwable> exceptions = new ArrayList<Throwable>(actionCount); List<Row> actions = new ArrayList<Row>(actionCount); List<String> addresses = new ArrayList<String>(actionCount); for (int i = 0; i < results.length; i++) { if (results[i] == null || results[i] instanceof Throwable) { exceptions.add((Throwable) results[i]); actions.add(list.get(i)); addresses.add(lastServers[i].getHostnamePort()); } } if (!exceptions.isEmpty()) { throw new RetriesExhaustedWithDetailsException(exceptions, actions, addresses); } }
// MultiAction相当于是一个Action的集合(事实上它确实是一个有序的集合) // 而Action就是指Get,Put,Del等 Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { HRegionLocation loc = locateRegion(tableName, row.getRow()); // 定位每一个rowkey对应的region,这里是个大头,详情请看这里 // [url]http://dance-lee-163-com.iteye.com/admin/blogs/1972477[/url] byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); actionsByServer.put(loc, actions); }// 按region信息和action列表的对应关系保存 Action<R> action = new Action<R>(row, i); lastServers[i] = loc; actions.add(regionName, action); // 按regionanme和操作添加,其实内部现在只有一个entry } }
Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>( actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); // 这里代码比较简单,就是创建提交请求任务, // 并将任务提交给HTable的线程池,createCallable方法后续介绍。 }
for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer : futures.entrySet()) { HRegionLocation loc = responsePerServer.getKey(); try { Future<MultiResponse> future = responsePerServer.getValue(); MultiResponse resp = future.get(); if (resp == null) { // Entire server failed LOG.debug("Failed all for server: " + loc.getHostnamePort() + ", removing from cache"); continue; } for (Entry<byte[], List<Pair<Integer, Object>>> e : resp.getResults().entrySet()) { byte[] regionName = e.getKey(); List<Pair<Integer, Object>> regionResults = e.getValue(); for (Pair<Integer, Object> regionResult : regionResults) { if (regionResult == null) { // if the first/only record is 'null' the entire region failed. LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache"); } else { // Result might be an Exception, including DNRIOE results[regionResult.getFirst()] = regionResult.getSecond(); if (callback != null && !(regionResult.getSecond() instanceof Throwable)) { callback.update(e.getKey(), list.get(regionResult.getFirst()).getRow(), (R) regionResult.getSecond()); } } } } } catch (ExecutionException e) { LOG.warn("Failed all from " + loc, e); } }
// Find failures (i.e. null Result), and add them to the workingList (in // order), so they can be retried. retry = false; workingList.clear(); actionCount = 0; for (int i = 0; i < results.length; i++) { // if null (fail) or instanceof Throwable && not instanceof DNRIOE // then retry that row. else dont. if (results[i] == null || (results[i] instanceof Throwable && !(results[i] instanceof DoNotRetryIOException))) { // 如果结果为空,或者结果是非DoNotRetryIOException的异常 // 就把该请求和应该处理该请求返回任务队列, // 然后删除已经缓存的处理该任务的region信息 retry = true; actionCount++; Row row = list.get(i); workingList.add(row); deleteCachedLocation(tableName, row.getRow()); } else { if (results[i] != null && results[i] instanceof Throwable) { actionCount++; } // add null to workingList, so the order remains consistent with the original list argument. workingList.add(null); } }
private <R> Callable<MultiResponse> createCallable( final HRegionLocation loc, final MultiAction<R> multi, final byte[] tableName) { // TODO: This does not belong in here!!! St.Ack HConnections should // not be dealing in Callables; Callables have HConnections, not other // way around. final HConnection connection = this; return new Callable<MultiResponse>() { public MultiResponse call() throws IOException { ServerCallable<MultiResponse> callable = new ServerCallable<MultiResponse>(connection, tableName, null) { // 看这里看这里 public MultiResponse call() throws IOException { return server.multi(multi); } @Override public void connect(boolean reload) throws IOException { server = connection.getHRegionConnection(loc.getHostname(), loc.getPort()); } }; return callable.withoutRetries(); } }; } public T withoutRetries() throws IOException, RuntimeException { globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { beforeCall();// 记录和计算时间 connect(false); return call(); } catch (Throwable t) { Throwable t2 = translateException(t); if (t2 instanceof IOException) { throw (IOException) t2; } else { throw new RuntimeException(t2); } } finally { afterCall();记录和计算时间 } }