首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 企业软件 > 行业软件 >

HBASE 代码阅览笔记-1 - PUT-3-提交任务1 (0.96-HADOOP2)

2013-11-12 
HBASE 代码阅读笔记-1 - PUT-3-提交任务1 (0.96-HADOOP2)看看提交任务的代码吧。对应http://dennis-lee-gam

HBASE 代码阅读笔记-1 - PUT-3-提交任务1 (0.96-HADOOP2)
看看提交任务的代码吧。对应http://dennis-lee-gammy.iteye.com/admin/blogs/1973249 TODO 【4】

public void sendMultiAction(final List<Action<Row>> initialActions,                              Map<HRegionLocation, MultiAction<Row>> actionsByServer,                              final int numAttempt,                              final HConnectionManager.ServerErrorTracker errorsByServer) {    // Send the queries and add them to the inProgress list    // This iteration is by server (the HRegionLocation comparator is by server portion only).    for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {      final HRegionLocation loc = e.getKey();      final MultiAction<Row> multiAction = e.getValue();      incTaskCounters(multiAction.getRegions(), loc.getServerName());      //这里简单,就是将taskSent,taskPerRegion(默认最大值1),taskPerServer(默认最大值5)都进行了+1操作,以便之前的判断      Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {        @Override        public void run() {          MultiResponse res;          try {            MultiServerCallable<Row> callable = createCallable(loc, multiAction);            //原先的Callable没有了哦亲。MultiServerCallable,全新的Api哦亲            try {              res = createCaller(callable).callWithoutRetries(callable);              //这里是创建了一个RpcRetryingCaller,然后再调用callable,这里很奇怪,在构造的时候传入了callable,但是却没有使用,RpcRetryingCaller有一个估计MultiServerCallable的成员变量,后来因为某些原因干掉了。createCall方法很简单,主要是callWithoutRetries。备注【1】            } catch (IOException e) {              LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +                ", resubmitting all since not sure where we are at", e);              resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);//失败重试,备注【2】              return;            }            receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);//提交任务,备注【3】          } finally {            decTaskCounters(multiAction.getRegions(), loc.getServerName());            // taskDone+1,taskPerRegion,taskPerServer-1          }        }      });      try {        this.pool.submit(runnable);//HTable的pool跑到这里来了      } catch (RejectedExecutionException ree) {        // This should never happen. But as the pool is provided by the end user, let's secure        //  this a little.        decTaskCounters(multiAction.getRegions(), loc.getServerName());        LOG.warn("The task was rejected by the pool. This is unexpected." +            " Server is " + loc.getServerName(), ree);        // We're likely to fail again, but this will increment the attempt counter, so it will        //  finish.        resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);//同上,失败重试      }    }  }


先看看失败重试吧,对应备注【2】

private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions,                           HRegionLocation location, int numAttempt, Throwable t,                           HConnectionManager.ServerErrorTracker errorsByServer) {    // Do not use the exception for updating cache because it might be coming from    // any of the regions in the MultiAction.    // 先更新cache的region信息    hConnection.updateCachedLocations(tableName,      rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);    errorsByServer.reportServerError(location);//保存异常    List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());    for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {      for (Action<Row> action : e.getValue()) {        if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),            true, t, location)) {//检查是否可以重试。这里备注下【4】          toReplay.add(action);        }      }    }    if (toReplay.isEmpty()) {//没有可重试的就88      LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +        initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());    } else {      submit(initialActions, toReplay, numAttempt, errorsByServer);//重试    }  }


public void updateCachedLocations(final TableName tableName, byte[] rowkey,      final Object exception, final HRegionLocation source) {      if (rowkey == null || tableName == null) {        LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +            ", tableName=" + (tableName == null ? "null" : tableName));        return;      }      // Is it something we have already updated?      final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);      if (oldLocation == null) {        // There is no such location in the cache => it's been removed already => nothing to do        return;      }      HRegionInfo regionInfo = oldLocation.getRegionInfo();      final RegionMovedException rme = RegionMovedException.find(exception);      if (rme != null) {        if (LOG.isTraceEnabled()){          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +            rme.getHostname() + ":" + rme.getPort() + " according to " + source.getHostnamePort());        }        //这里如果是region已经移动的信息,则异常会将新的region地址返回,然后更新缓存        updateCachedLocation(            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());      } else if (RegionOpeningException.find(exception) != null) {        //意思大概是region如果已经由别的RS打开,则client可以直接请求新的region,所以这里不用做处理        //不是很明白,求高手解答        if (LOG.isTraceEnabled()) {          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " is being opened on "              + source.getHostnamePort() + "; not deleting the cache entry");        }      } else {        deleteCachedLocation(regionInfo, source);//真是不可恢复的错误就把缓存删了吧      }    }


  void reportServerError(HRegionLocation server) {      ServerErrors errors = errorsByServer.get(server);      if (errors != null) {        errors.addError();      } else {        errorsByServer.put(server, new ServerErrors());      }    }


//老三样,按region归并action,然后提交。少了很多判断private void submit(List<Action<Row>> initialActions,                      List<Action<Row>> currentActions, int numAttempt,                      final HConnectionManager.ServerErrorTracker errorsByServer) {    // group per location => regions server    final Map<HRegionLocation, MultiAction<Row>> actionsByServer =        new HashMap<HRegionLocation, MultiAction<Row>>();    for (Action<Row> action : currentActions) {      HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex());      if (loc != null) {        addAction(loc, action, actionsByServer);      }    }    if (!actionsByServer.isEmpty()) {      sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer);    }  }



这样重试的代码就理清楚了
看看RpcRetryingCaller.callWithoutRetries方法吧,主要是调用了prepare和call方法,
public T callWithoutRetries(RetryingCallable<T> callable)  throws IOException, RuntimeException {    // The code of this method should be shared with withRetries.    this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();    try {      beforeCall();      callable.prepare(false);//这里是个坑哦亲,不管传入什么都一样哦,没有的参数啊亲,主要是返回一个stub,已经在之前的region定位篇中说过,这依赖protobuf库。      return callable.call();    } catch (Throwable t) {      Throwable t2 = translateException(t);      // It would be nice to clear the location cache here.      if (t2 instanceof IOException) {        throw (IOException)t2;      } else {        throw new RuntimeException(t2);      }    } finally {      afterCall();    }  }


  public void prepare(boolean reload) throws IOException {    // Use the location we were given in the constructor rather than go look it up.    setStub(getConnection().getClient(getLocation().getServerName()));  }


call方法是核心,明天再看吧

热点排行