FileOutputFormatを継承した自作OutputFormatでの圧縮

FileOutputFormatを継承して自作のOutputFormatを作る場合、圧縮関連の処理はFileOutputFormatには含まれないので、自分で処理を書く必要がある。

その際、TextInputFormat の getRecordWriterが、シンプルで参考になる。
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?view=markup

 public RecordWriter<K, V> 
       getRecordWriter(TaskAttemptContext job
                       ) throws IOException, InterruptedException {
  Configuration conf = job.getConfiguration();
  boolean isCompressed = getCompressOutput(job);
  String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
                                     "\t");
  CompressionCodec codec = null;
  String extension = "";
  if (isCompressed) {
    Class<? extends CompressionCodec> codecClass = 
      getOutputCompressorClass(job, GzipCodec.class);
    codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
    extension = codec.getDefaultExtension();
  }
  Path file = getDefaultWorkFile(job, extension);
  FileSystem fs = file.getFileSystem(conf);
  if (!isCompressed) {
    FSDataOutputStream fileOut = fs.create(file, false);
    return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  } else {
    FSDataOutputStream fileOut = fs.create(file, false);
    return new LineRecordWriter<K, V>(new DataOutputStream
                                      (codec.createOutputStream(fileOut)),
                                      keyValueSeparator);
  }
}

たいした話ではないが、メモメモ。