`

InputFormat牛逼(9)FileInputFormat实现类之SequenceFileInputFormat

 
阅读更多
一、SequenceFileInputFormat及SequenceFileRecordReader
/** An {@link InputFormat} for {@link SequenceFile}s. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {

  @Override
  public RecordReader<K, V> createRecordReader(InputSplit split,
                                               TaskAttemptContext context
                                               ) throws IOException {
    return new SequenceFileRecordReader<K,V>();
  }

  @Override
  protected long getFormatMinSplitSize() {
    return SequenceFile.SYNC_INTERVAL;
  }

  @Override
  protected List<FileStatus> listStatus(JobContext job
                                        )throws IOException {

    List<FileStatus> files = super.listStatus(job);
    int len = files.size();
    for(int i=0; i < len; ++i) {
      FileStatus file = files.get(i);
      if (file.isDirectory()) {     // it's a MapFile
        Path p = file.getPath();
        FileSystem fs = p.getFileSystem(job.getConfiguration());
        // use the data file
        files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
      }
    }
    return files;
  }
}
/** An {@link RecordReader} for {@link SequenceFile}s. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
  private SequenceFile.Reader in;
  private long start;
  private long end;
  private boolean more = true;
  private K key = null;
  private V value = null;
  protected Configuration conf;

  @Override
  public void initialize(InputSplit split, 
                         TaskAttemptContext context
                         ) throws IOException, InterruptedException {
    FileSplit fileSplit = (FileSplit) split;
    conf = context.getConfiguration();    
    Path path = fileSplit.getPath();
    FileSystem fs = path.getFileSystem(conf);
    this.in = new SequenceFile.Reader(fs, path, conf);
    this.end = fileSplit.getStart() + fileSplit.getLength();

    if (fileSplit.getStart() > in.getPosition()) {
      in.sync(fileSplit.getStart());                  // sync to start
    }

    this.start = in.getPosition();
    more = start < end;
  }

  @Override
  @SuppressWarnings("unchecked")
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!more) {
      return false;
    }
    long pos = in.getPosition();
    key = (K) in.next(key);
    if (key == null || (pos >= end && in.syncSeen())) {
      more = false;
      key = null;
      value = null;
    } else {
      value = (V) in.getCurrentValue(value);
    }
    return more;
  }

  @Override
  public K getCurrentKey() {
    return key;
  }
  
  @Override
  public V getCurrentValue() {
    return value;
  }
  
  /**
   * Return the progress within the input split
   * @return 0.0 to 1.0 of the input byte range
   */
  public float getProgress() throws IOException {
    if (end == start) {
      return 0.0f;
    } else {
      return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
    }
  }
  
  public synchronized void close() throws IOException { in.close(); }
  
}

二、SequenceFileAsBinaryInputFormat及SequenceFileAsBinaryRecordReader
/**
 * InputFormat reading keys, values from SequenceFiles in binary (raw)
 * format.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsBinaryInputFormat
    extends SequenceFileInputFormat<BytesWritable,BytesWritable> {

  public SequenceFileAsBinaryInputFormat() {
    super();
  }

  public RecordReader<BytesWritable,BytesWritable> createRecordReader(
      InputSplit split, TaskAttemptContext context)
      throws IOException {
    return new SequenceFileAsBinaryRecordReader();
  }

  /**
   * Read records from a SequenceFile as binary (raw) bytes.
   */
  public static class SequenceFileAsBinaryRecordReader
      extends RecordReader<BytesWritable,BytesWritable> {
    private SequenceFile.Reader in;
    private long start;
    private long end;
    private boolean done = false;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private SequenceFile.ValueBytes vbytes;
    private BytesWritable key = null;
    private BytesWritable value = null;

    public void initialize(InputSplit split, TaskAttemptContext context) 
        throws IOException, InterruptedException {
      Path path = ((FileSplit)split).getPath();
      Configuration conf = context.getConfiguration();
      FileSystem fs = path.getFileSystem(conf);
      this.in = new SequenceFile.Reader(fs, path, conf);
      this.end = ((FileSplit)split).getStart() + split.getLength();
      if (((FileSplit)split).getStart() > in.getPosition()) {
        in.sync(((FileSplit)split).getStart());    // sync to start
      }
      this.start = in.getPosition();
      vbytes = in.createValueBytes();
      done = start >= end;
    }
    
    @Override
    public BytesWritable getCurrentKey() 
        throws IOException, InterruptedException {
      return key;
    }
    
    @Override
    public BytesWritable getCurrentValue() 
        throws IOException, InterruptedException {
      return value;
    }

    /**
     * Retrieve the name of the key class for this SequenceFile.
     * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
     */
    public String getKeyClassName() {
      return in.getKeyClassName();
    }

    /**
     * Retrieve the name of the value class for this SequenceFile.
     * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
     */
    public String getValueClassName() {
      return in.getValueClassName();
    }

    /**
     * Read raw bytes from a SequenceFile.
     */
    public synchronized boolean nextKeyValue()
        throws IOException, InterruptedException {
      if (done) {
        return false;
      }
      long pos = in.getPosition();
      boolean eof = -1 == in.nextRawKey(buffer);
      if (!eof) {
        if (key == null) {
          key = new BytesWritable();
        }
        if (value == null) {
          value = new BytesWritable();
        }
        key.set(buffer.getData(), 0, buffer.getLength());
        buffer.reset();
        in.nextRawValue(vbytes);
        vbytes.writeUncompressedBytes(buffer);
        value.set(buffer.getData(), 0, buffer.getLength());
        buffer.reset();
      }
      return !(done = (eof || (pos >= end && in.syncSeen())));
    }

    public void close() throws IOException {
      in.close();
    }

    /**
     * Return the progress within the input split
     * @return 0.0 to 1.0 of the input byte range
     */
    public float getProgress() throws IOException, InterruptedException {
      if (end == start) {
        return 0.0f;
      } else {
        return Math.min(1.0f, (float)((in.getPosition() - start) /
                                      (double)(end - start)));
      }
    }
  }
}

三、SequenceFileAsBinaryRecordReader
及SequenceFileAsTextRecordReader
/**
 * This class is similar to SequenceFileInputFormat, except it generates
 * SequenceFileAsTextRecordReader which converts the input keys and values
 * to their String forms by calling toString() method. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsTextInputFormat
  extends SequenceFileInputFormat<Text, Text> {

  public SequenceFileAsTextInputFormat() {
    super();
  }

  public RecordReader<Text, Text> createRecordReader(InputSplit split,
      TaskAttemptContext context) throws IOException {
    context.setStatus(split.toString());
    return new SequenceFileAsTextRecordReader();
  }
}

/**
 * This class converts the input keys and values to their String forms by
 * calling toString() method. This class to SequenceFileAsTextInputFormat
 * class is as LineRecordReader class to TextInputFormat class.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsTextRecordReader
  extends RecordReader<Text, Text> {
  
  private final SequenceFileRecordReader<WritableComparable<?>, Writable>
    sequenceFileRecordReader;

  private Text key;
  private Text value;

  public SequenceFileAsTextRecordReader()
    throws IOException {
    sequenceFileRecordReader =
      new SequenceFileRecordReader<WritableComparable<?>, Writable>();
  }

  public void initialize(InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {
    sequenceFileRecordReader.initialize(split, context);
  }

  @Override
  public Text getCurrentKey() 
      throws IOException, InterruptedException {
    return key;
  }
  
  @Override
  public Text getCurrentValue() 
      throws IOException, InterruptedException {
    return value;
  }
  
  /** Read key/value pair in a line. */
  public synchronized boolean nextKeyValue() 
      throws IOException, InterruptedException {
    if (!sequenceFileRecordReader.nextKeyValue()) {
      return false;
    }
    if (key == null) {
      key = new Text(); 
    }
    if (value == null) {
      value = new Text(); 
    }
    key.set(sequenceFileRecordReader.getCurrentKey().toString());
    value.set(sequenceFileRecordReader.getCurrentValue().toString());
    return true;
  }
  
  public float getProgress() throws IOException,  InterruptedException {
    return sequenceFileRecordReader.getProgress();
  }
  
  public synchronized void close() throws IOException {
    sequenceFileRecordReader.close();
  }
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics