首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > 其他数据库 >

HBase splitlog 历程

2012-07-16 
HBase splitlog 过程上一篇Blog提到了HBase在regionserver挂掉以后,master会处理,其中很重要的一步是就是s

HBase splitlog 过程

上一篇Blog提到了HBase在regionserver挂掉以后,master会处理,其中很重要的一步是就是splitlog,把.logs目录下的该rs的文件夹里的HLog文件,按照region进行分配。splitlog的代码如下所示:

private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {    List<Path> processedLogs = new ArrayList<Path>();//成功处理以后的文件放入这个目录下    List<Path> corruptedLogs = new ArrayList<Path>();//读取文件出错的放入这个目录下    List<Path> splits = null;    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);    long totalBytesToSplit = countTotalBytes(logfiles);    splitSize = 0;    outputSink.startWriterThreads(entryBuffers);//启动三个写线程,将内存中的数据按照region分别写入region下的recover.edits目录下    try {      int i = 0;      for (FileStatus log : logfiles) {       Path logPath = log.getPath();        long logLength = log.getLen();        splitSize += logLength;        LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length            + ": " + logPath + ", length=" + logLength);        Reader in;        try {          in = getReader(fs, log, conf, skipErrors);          if (in != null) {            parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);//读取文件写入内存entryBuffers            try {              in.close();            } catch (IOException e) {              LOG.warn("Close log reader threw exception -- continuing",                  e);            }          }          processedLogs.add(logPath);        } catch (CorruptedLogFileException e) {          LOG.info("Got while parsing hlog " + logPath +              ". Marking as corrupted", e);          corruptedLogs.add(logPath);          continue;        }      }      if (fs.listStatus(srcDir).length > processedLogs.size()          + corruptedLogs.size()) {        throw new OrphanHLogAfterSplitException(            "Discovered orphan hlog after split. Maybe the "            + "HRegionServer was not dead when we started");      }      archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);//把corrutedLogs里的path放入到.corruptedlogs上,把processedLogs上的path移到oldlog上,并删除HLog    } finally {      LOG.info("Finishing writing output logs and closing down.");      splits = outputSink.finishWritingAndClose();    }    return splits;  }

?ParseHLog过程很简单从文件中读取数据写入到内存中,一次最多128M

  private void parseHLog(final Reader in, Path path,EntryBuffers entryBuffers, final FileSystem fs,    final Configuration conf, boolean skipErrors)throws IOException, CorruptedLogFileException {    int editsCount = 0;    try {      Entry entry;      while ((entry = getNextLogLine(in, path, skipErrors)) != null) {        entryBuffers.appendEntry(entry);        editsCount++;      }    } catch (InterruptedException ie) {      IOException t = new InterruptedIOException();      t.initCause(ie);      throw t;    } finally {      LOG.debug("Pushed=" + editsCount + " entries from " + path);    }  }

?写线程也比较简单,每个线程从entryBuffer中获取一个region的一块数据,在一个entrBuffer中,一个region只能由一个线程来handler,不然会有多个写线程同时对一个文件进行操作。

 private void doRun() throws IOException {      LOG.debug("Writer thread " + this + ": starting");      while (true) {        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();        if (buffer == null) {          // No data currently available, wait on some more to show up          synchronized (dataAvailable) {            if (shouldStop) return;            try {              dataAvailable.wait(1000);            } catch (InterruptedException ie) {              if (!shouldStop) {                throw new RuntimeException(ie);              }            }          }          continue;        }        assert buffer != null;        try {          writeBuffer(buffer);        } finally {          entryBuffers.doneWriting(buffer);        }      }    }

?

?

热点排行