首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 企业软件 > 行业软件 >

HBASE 代码翻阅笔记-1 - PUT-1-主流程(基于0.94.12)

2013-11-12 
HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)最近闲来无事看看hbase源代码,为了加强理解和记忆随便

HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)
最近闲来无事看看hbase源代码,为了加强理解和记忆随便写了一些东西在这里。初次整理,内容和思路都比较凌乱。(本篇已完结)
本代码基于hbase 0.94.12
HTable

@Override    public void put(final List<Put> puts) throws IOException {        for (Put put : puts) {            doPut(put);        }        if (autoFlush) {            flushCommits();        }    }    private void doPut(Put put) throws IOException {        validatePut(put);        writeBuffer.add(put);        currentWriteBufferSize += put.heapSize();        if (currentWriteBufferSize > writeBufferSize) {            flushCommits();        }    }

    这里执行put操作,批量put操作只是单纯的循环调用单次的put操作而已。
单次put操作中,会先校验put的正确性,比较简单,这里不做累述。然后会将put操作放入一个等待队列中(writeBuffer),并累计客户端cache大小(currentWriteBufferSize)。如果cache大小已经超过限制(默认是2m,由hbase.client.write.buffer参数配置),则提交到服务端。

@Override    public void flushCommits() throws IOException {        try {            Object[] results = new Object[writeBuffer.size()];            try {                this.connection.processBatch(writeBuffer, tableName, pool, results);//这里是关键的操作            } catch (InterruptedException e) {                throw new IOException(e);            } finally {                                // mutate list so that it is empty for complete success, or contains                // only failed records results are returned in the same order as the                // requests in list walk the list backwards, so we can remove from list                // without impacting the indexes of earlier members                // 删除已经成功返回的操作                for (int i = results.length - 1; i >= 0; i--) {                    if (results[i] instanceof Result) {                        // successful Puts are removed from the list here.                        writeBuffer.remove(i);                    }                }            }        } finally {            if (clearBufferOnFail) {//如果设置为clearBufferOnFail模式,则不管成功与否都清除操作队列和缓存大小,该值默认=autoFlush                writeBuffer.clear();                currentWriteBufferSize = 0;            } else {                // the write buffer was adjusted by processBatchOfPuts                currentWriteBufferSize = 0;                for (Put aPut : writeBuffer) {                    currentWriteBufferSize += aPut.heapSize();                }// 重新计算一遍缓存大小            }        }    }    public void setAutoFlush(boolean autoFlush) {        setAutoFlush(autoFlush, autoFlush);    }    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {        this.autoFlush = autoFlush;        this.clearBufferOnFail = autoFlush || clearBufferOnFail;    }


   
HConnectionManager.HConnectionImplementation:HTable内部HConnection成员的实际实现类。这里是上面提到的所谓关键操作的代码
public void processBatch(List<? extends Row> list,                                 final byte[] tableName,                                 ExecutorService pool,                                 Object[] results) throws IOException, InterruptedException {            // This belongs in HTable!!! Not in here.  St.Ack            // results must be the same size as list            if (results.length != list.size()) {                throw new IllegalArgumentException("argument results must be the same size as argument list");            }            processBatchCallback(list, tableName, pool, results, null);        }

   
public <R> void processBatchCallback(                List<? extends Row> list,//put操作列表                byte[] tableName,//tableName                ExecutorService pool,//HTable的executorService成员                Object[] results,//返回结果                Batch.Callback<R> callback)//回调,目测现在为NULL                throws IOException, InterruptedException {            // This belongs in HTable!!! Not in here.  St.Ack            // results must be the same size as list            if (results.length != list.size()) {                throw new IllegalArgumentException(                        "argument results must be the same size as argument list");            }            if (list.isEmpty()) {                return;            }            // Keep track of the most recent servers for any given item for better            // exceptional reporting.  We keep HRegionLocation to save on parsing.            // Later below when we use lastServers, we'll pull what we need from            // lastServers.            // 记录下最近访问的服务器的信息以便报告异常。            // 保存HRegionLocation来节省解析的成本。            // 然后当我们要用到最新访问的服务器的时候,            // 可以直接从lastservers里面获取想要的信息            HRegionLocation[] lastServers = new HRegionLocation[results.length];            List<Row> workingList = new ArrayList<Row>(list);            boolean retry = true;            // count that helps presize actions array            int actionCount = 0;            // numRetries,失败重试次数,            // 由hbase.client.retries.number配置决定,默认为10            for (int tries = 0; tries < numRetries && retry; ++tries) {                // sleep first, if this is a retry                if (tries >= 1) {                    long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);                    LOG.debug("Retry " + tries + ", sleep for " + sleepTime + "ms!");                    Thread.sleep(sleepTime);                }                // 以下方法会很长,每一个步骤都单独提取出来阅读吧                // step 1: break up into regionserver-sized chunks and build the data structs                                // step 2: make the requests                                // step 3: collect the failures and successes and prepare for retry                                // step 4: identify failures and prep for a retry (if applicable).                        }            // 成功或重试完成后,检查是否还有失败的任务            List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);            List<Row> actions = new ArrayList<Row>(actionCount);            List<String> addresses = new ArrayList<String>(actionCount);            for (int i = 0; i < results.length; i++) {                if (results[i] == null || results[i] instanceof Throwable) {                    exceptions.add((Throwable) results[i]);                    actions.add(list.get(i));                    addresses.add(lastServers[i].getHostnamePort());                }            }            if (!exceptions.isEmpty()) {                throw new RetriesExhaustedWithDetailsException(exceptions,                        actions,                        addresses);            }        }

step1,按region封装和分配请求,这里有一步很重要的操作:定位rowkey所属的region和RS,详见http://dennis-lee-gammy.iteye.com/admin/blogs/1972477
         // MultiAction相当于是一个Action的集合(事实上它确实是一个有序的集合)         // 而Action就是指Get,Put,Del等         Map<HRegionLocation, MultiAction<R>> actionsByServer =                        new HashMap<HRegionLocation, MultiAction<R>>();                for (int i = 0; i < workingList.size(); i++) {                    Row row = workingList.get(i);                    if (row != null) {                        HRegionLocation loc = locateRegion(tableName, row.getRow());                        // 定位每一个rowkey对应的region,这里是个大头,详情请看这里                        // [url]http://dance-lee-163-com.iteye.com/admin/blogs/1972477[/url]                        byte[] regionName = loc.getRegionInfo().getRegionName();                        MultiAction<R> actions = actionsByServer.get(loc);                        if (actions == null) {                            actions = new MultiAction<R>();                            actionsByServer.put(loc, actions);                        }// 按region信息和action列表的对应关系保存                        Action<R> action = new Action<R>(row, i);                        lastServers[i] = loc;                        actions.add(regionName, action);                        // 按regionanme和操作添加,其实内部现在只有一个entry                    }                }

step 2
Map<HRegionLocation, Future<MultiResponse>> futures =                        new HashMap<HRegionLocation, Future<MultiResponse>>(                                actionsByServer.size());                for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {                    futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));                    // 这里代码比较简单,就是创建提交请求任务,                    // 并将任务提交给HTable的线程池,createCallable方法后续介绍。                }

step 3 这里主要是收集执行结果
MultiResponse维护了一个regionName - responseList的字典结构,其中responseList元素类型为Pair,first为当前结果对应的action在原actionList中的索引,second为实际结果。
for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer                        : futures.entrySet()) {                    HRegionLocation loc = responsePerServer.getKey();                    try {                        Future<MultiResponse> future = responsePerServer.getValue();                        MultiResponse resp = future.get();                        if (resp == null) {                            // Entire server failed                            LOG.debug("Failed all for server: " + loc.getHostnamePort() +                                    ", removing from cache");                            continue;                        }                        for (Entry<byte[], List<Pair<Integer, Object>>> e : resp.getResults().entrySet()) {                            byte[] regionName = e.getKey();                            List<Pair<Integer, Object>> regionResults = e.getValue();                            for (Pair<Integer, Object> regionResult : regionResults) {                                if (regionResult == null) {                                    // if the first/only record is 'null' the entire region failed.                                    LOG.debug("Failures for region: " +                                            Bytes.toStringBinary(regionName) +                                            ", removing from cache");                                } else {                                    // Result might be an Exception, including DNRIOE                                    results[regionResult.getFirst()] = regionResult.getSecond();                                    if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {                                        callback.update(e.getKey(),                                                list.get(regionResult.getFirst()).getRow(),                                                (R) regionResult.getSecond());                                    }                                }                            }                        }                    } catch (ExecutionException e) {                        LOG.warn("Failed all from " + loc, e);                    }                }

step 4 这里主要是集中处理异常
// Find failures (i.e. null Result), and add them to the workingList (in                // order), so they can be retried.                retry = false;                workingList.clear();                actionCount = 0;                for (int i = 0; i < results.length; i++) {                    // if null (fail) or instanceof Throwable && not instanceof DNRIOE                    // then retry that row. else dont.                    if (results[i] == null ||                            (results[i] instanceof Throwable &&                                    !(results[i] instanceof DoNotRetryIOException))) {                        // 如果结果为空,或者结果是非DoNotRetryIOException的异常                        // 就把该请求和应该处理该请求返回任务队列,                        // 然后删除已经缓存的处理该任务的region信息                        retry = true;                        actionCount++;                        Row row = list.get(i);                        workingList.add(row);                        deleteCachedLocation(tableName, row.getRow());                    } else {                        if (results[i] != null && results[i] instanceof Throwable) {                            actionCount++;                        }                        // add null to workingList, so the order remains consistent with the original list argument.                        workingList.add(null);                    }                }


最后是创建请求的方法,这里的call方法只是创建了一个ServerCallable对象。然后调用该对象的withoutRetries方法。
      private <R> Callable<MultiResponse> createCallable(                         final HRegionLocation loc,                         final MultiAction<R> multi, final byte[] tableName) {            // TODO: This does not belong in here!!! St.Ack  HConnections should            // not be dealing in Callables; Callables have HConnections, not other            // way around.            final HConnection connection = this;            return new Callable<MultiResponse>() {                public MultiResponse call() throws IOException {                    ServerCallable<MultiResponse> callable =                            new ServerCallable<MultiResponse>(connection, tableName, null) {                                // 看这里看这里                                public MultiResponse call() throws IOException {                                    return server.multi(multi);                                }                                @Override                                public void connect(boolean reload) throws IOException {                                    server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());                                }                            };                    return callable.withoutRetries();                }            };        }                public T withoutRetries()            throws IOException, RuntimeException {        globalStartTime = EnvironmentEdgeManager.currentTimeMillis();        try {            beforeCall();// 记录和计算时间            connect(false);            return call();        } catch (Throwable t) {            Throwable t2 = translateException(t);            if (t2 instanceof IOException) {                throw (IOException) t2;            } else {                throw new RuntimeException(t2);            }        } finally {            afterCall();记录和计算时间        }    }

热点排行