首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

Hadoop MapReduce中如何避免跨行Block和UnputSplit

2012-10-30 
Hadoop MapReduce中如何处理跨行Block和UnputSplitHadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个B

Hadoop MapReduce中如何处理跨行Block和UnputSplit

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?

对于上面的两个问题,首先要明确两个概念:Block和InputSplit

?? ? ?1. block是hdfs存储文件的单位(默认是64M);
?? ? ?2.?InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此,以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

?

  public List<InputSplit> getSplits(JobContext job) throws IOException {    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    long maxSize = getMaxSplitSize(job);    // generate splits    List<InputSplit> splits = new ArrayList<InputSplit>();    List<FileStatus> files = listStatus(job);          for (FileStatus file: files) {      Path path = file.getPath();      long length = file.getLen();      if (length != 0) {        FileSystem fs = path.getFileSystem(job.getConfiguration());        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);        if (isSplitable(job, path)) {          long blockSize = file.getBlockSize();          long splitSize = computeSplitSize(blockSize, minSize, maxSize);          long bytesRemaining = length;          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);            splits.add(makeSplit(path, length-bytesRemaining, splitSize,                                     blkLocations[blkIndex].getHosts()));            bytesRemaining -= splitSize;          }          if (bytesRemaining != 0) {            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,                       blkLocations[blkLocations.length-1].getHosts()));          }        } else { // not splitable          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));        }      } else {         //Create empty hosts array for zero length files        splits.add(makeSplit(path, 0, length, new String[0]));      }    }    // Save the number of input files for metrics/loadgen    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());    LOG.debug("Total # of splits: " + splits.size());    return splits;  }

?从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize

对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);blockSize,minSize,maxSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码:

?

    while (getFilePosition() <= end) {      newSize = in.readLine(value, maxLineLength,          Math.max(maxBytesToConsume(pos), maxLineLength));      if (newSize == 0) {        break;      }

?

?其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的:

?

  public int readLine(Text str, int maxLineLength,                      int maxBytesToConsume) throws IOException {    if (this.recordDelimiterBytes != null) {      return readCustomLine(str, maxLineLength, maxBytesToConsume);    } else {      return readDefaultLine(str, maxLineLength, maxBytesToConsume);    }  }  /**   * Read a line terminated by one of CR, LF, or CRLF.   */  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)  throws IOException {    str.clear();    int txtLength = 0; //tracks str.getLength(), as an optimization    int newlineLength = 0; //length of terminating newline    boolean prevCharCR = false; //true of prev char was CR    long bytesConsumed = 0;    do {      int startPosn = bufferPosn; //starting from where we left off the last time      if (bufferPosn >= bufferLength) {        startPosn = bufferPosn = 0;        if (prevCharCR)          ++bytesConsumed; //account for CR from previous read        bufferLength = in.read(buffer);        if (bufferLength <= 0)          break; // EOF      }      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline        if (buffer[bufferPosn] == LF) {          newlineLength = (prevCharCR) ? 2 : 1;          ++bufferPosn; // at next invocation proceed from following byte          break;        }        if (prevCharCR) { //CR + notLF, we are at notLF          newlineLength = 1;          break;        }        prevCharCR = (buffer[bufferPosn] == CR);      }      int readLength = bufferPosn - startPosn;      if (prevCharCR && newlineLength == 0)        --readLength; //CR at the end of the buffer      bytesConsumed += readLength;      int appendLength = readLength - newlineLength;      if (appendLength > maxLineLength - txtLength) {        appendLength = maxLineLength - txtLength;      }      if (appendLength > 0) {        str.append(buffer, startPosn, appendLength);        txtLength += appendLength;      }    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);   //①    if (bytesConsumed > (long)Integer.MAX_VALUE)      throw new IOException("Too many bytes before newline: " + bytesConsumed);        return (int)bytesConsumed;  }

?我们分析下readDefaultLine方法,do-while循环体主要是读取文件,然后遍历读取的内容,找到默认的换行符就终止循环。前面说,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。这就体现在上述代码的While循环的终止条件上:

while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:

1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?

2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?

为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

?

  public boolean nextKeyValue() throws IOException {    if (key == null) {      key = new LongWritable();    }    key.set(pos);    if (value == null) {      value = new Text();    }    int newSize = 0;    // We always read one extra line, which lies outside the upper    // split limit i.e. (end - 1)    while (getFilePosition() <= end) {         //②      newSize = in.readLine(value, maxLineLength,          Math.max(maxBytesToConsume(pos), maxLineLength));      if (newSize == 0) {        break;      }      pos += newSize;      inputByteCounter.increment(newSize);      if (newSize < maxLineLength) {        break;      }      // line too long. try again      LOG.info("Skipped line of size " + newSize + " at pos " +                (pos - newSize));    }    if (newSize == 0) {      key = null;      value = null;      return false;    } else {      return true;    }  }

?

?? ?通过代码②处得While条件,就保证了InputSplit读取边界的问题,如果存在跨InputSplit的记录,也只好跨InputSplit读取一次。

?? ? 再来看LineRecordReader的initialize方法:

    // If this is not the first split, we always throw away first record    // because we always (except the last split) read one extra line in    // next() method.    if (start != 0) {      start += in.readLine(new Text(), 0, maxBytesToConsume(start));    }    this.pos = start;

?? ?如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。

?

此次,前面提到的两个问题就回到完了。。。。。

?

?

?

?

?

?

?

?

    return splits;
  }
分成的InputSplit里应该是记录了, [Path, 起始位置,split的长度,所在的Host].
但如果我的文件是100M , splitsize是 50M, 当前Block是64M, 那么1M-50M的文件是在一个block A里,host A, 50M-64M也是在block A里, host A里,64M-100M在block B, host B里。。。那么按如上代码FileSplit记录的应该是[path, 0, 50M, hostA][path, 50M, 50M, hostA]

3 楼 fibers 2012-08-01   上面问题有点错误,更正一下
BlockLocation :
private String[] hosts; //hostnames of datanodes
private String[] names; //hostname:portNumber of datanodes
private String[] topologyPaths; // full path name in network topology
private long offset;  //offset of the of the block in the file
private long length;

假如文件:100M, splitsize:50M, block:64M,
那么刚开始BlockLocation[]的数据应该是(names,topologypaths就不列了)
[hostA, 0, 64M],[hostB, 64M, 34M]
最后计算完的inputsplit里的数据应该是
[path, 0, 50M, hostA],[path, 50M, 50M, hostB]

可是50M-64M应该是在A里的。。

这是为什么?谢谢

热点排行