`

hadoop系列A:多文件输出

阅读更多

 

package org.myorg; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
/**摘{@link TextOutputFormat}碌ineRecordWriter隆拢 */ 
public class LineRecordWriter<K, V> extends RecordWriter<K, V> { 
private static final String utf8 = "UTF-8"; 
private static final byte[] newline; 
static { 
try { 
newline = "\n".getBytes(utf8); 
} catch (UnsupportedEncodingException uee) { 
throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 
} 
} 
protected DataOutputStream out; 
private final byte[] keyValueSeparator; 
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { 
this.out = out; 
try { 
this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 
} catch (UnsupportedEncodingException uee) { 
throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 
} 
} 
public LineRecordWriter(DataOutputStream out) { 
this(out, "\t"); 
} 
private void writeObject(Object o) throws IOException { 
if (o instanceof Text) { 
Text to = (Text) o; 
out.write(to.getBytes(), 0, to.getLength()); 
} else { 
out.write(o.toString().getBytes(utf8)); 
} 
} 
public synchronized void write(K key, V value) throws IOException { 
boolean nullKey = key == null || key instanceof NullWritable; 
boolean nullValue = value == null || value instanceof NullWritable; 
if (nullKey && nullValue) { 
return; 
} 
if (!nullKey) { 
writeObject(key); 
} 
if (!(nullKey || nullValue)) { 
out.write(keyValueSeparator); 
} 
if (!nullValue) { 
writeObject(value); 
} 
out.write(newline); 
} 
public synchronized void close(TaskAttemptContext context) throws IOException { 
out.close(); 
}

}
 
public static class myOutput extends MultipleOutputFormat<Text, Text> {
@Override
protected String generateFileNameForKeyValue(Text key, Text value, TaskAttemptContext taskID) {

String mykey = key.toString();
String myValue = value.toString();

String tasknum = taskID.getTaskAttemptID().getTaskID().toString();
String fileNum = tasknum.substring(tasknum.length()-3);

String newname = mykey.substring(0,3);
return fileNum+newname;
}
}
 

目的:

根据输出数据的某些特征,分类输出到不同的文件夹下以便管理,而不是放在同一个文件夹下。

实现:

1、重写MultipleOutputFormat的某些方法,参考org.apache.hadoop.mapred.lib.MultipleOutputFormat,需要在程序中实现的子类方法是:

protected String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job),即通过key和value及conf配置信息决定文件名

(含扩展名)。其中,需要改写的方法是最后一个方法:private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job,

String baseName),baseName 即为 在程序中重写的子类 generateFileNameForKeyValue 的返回值。

代码:

package org.myorg; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.util.HashMap; 
import java.util.Iterator; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.mapreduce.OutputCommitter; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.ReflectionUtils; 
public abstract class MultipleOutputFormat<K extends WritableComparable, V extends Writable> 
extends FileOutputFormat<K, V> { 
private MultiRecordWriter writer = null; 
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, 
InterruptedException { 
if (writer == null) { 
writer = new MultiRecordWriter(job, getTaskOutputPath(job)); 
} 
return writer; 
} 
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null; 
OutputCommitter committer = super.getOutputCommitter(conf); 
if (committer instanceof FileOutputCommitter) { 
workPath = ((FileOutputCommitter) committer).getWorkPath(); 
} else { 
Path outputPath = super.getOutputPath(conf); 
if (outputPath == null) { 
throw new IOException("Undefined job output-path"); 
} 
workPath = outputPath; 
} 
return workPath; 
} 

protected abstract String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job);//Configuration conf); 
public class MultiRecordWriter extends RecordWriter<K, V> { 

private HashMap<String, RecordWriter<K, V>> recordWriters = null; 
private TaskAttemptContext job = null; 

private Path workPath = null; 
public MultiRecordWriter(TaskAttemptContext job, Path workPath) { 
super(); 
this.job = job; 
this.workPath = workPath; 
recordWriters = new HashMap<String, RecordWriter<K, V>>(); 
} 
@Override 
public void close(TaskAttemptContext context) throws IOException, InterruptedException { 
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); 
while (values.hasNext()) { 
values.next().close(context); 
} 
this.recordWriters.clear(); 
} 
@Override 
public void write(K key, V value) throws IOException, InterruptedException {

String baseName = generateFileNameForKeyValue(key, value, job);//job.getConfiguration()); 
RecordWriter<K, V> rw = this.recordWriters.get(baseName); 
if (rw == null) { 
rw = getBaseRecordWriter(job, baseName); 
this.recordWriters.put(baseName, rw); 
} 
//LongWritable keys=(LongWritable)key;
//long ret=keys.get()>>1;
//keys.set(ret);
rw.write(key, value);//change 
} 

private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) 
throws IOException, InterruptedException { 
Configuration conf = job.getConfiguration(); 
boolean isCompressed = getCompressOutput(job); 
String keyValueSeparator = "\t"; //change 
String pathname=baseName.substring(12); //change
RecordWriter<K, V> recordWriter = null; 
if (isCompressed) { 
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class); 
//String pathname=baseName.substring(12); 
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); 
Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11) + codec.getDefaultExtension()); //change 
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); 
recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec 
.createOutputStream(fileOut)), keyValueSeparator); 
} else { 
Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11)); //change
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); 
recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); 
} 
return recordWriter; 
}
} 
}
 

 

2、把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。

代码如下:

 

3、在主程序中加载generateFileNameForKeyValue方法:

 

在main函数中需添加 job.setOutputFormatClass(myOutput.class);

更多信息请查看 java进阶网 http://www.javady.com

3
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics