hadoop0.20.2中的API进行了大幅度的重构,导致很多API变成了@Deprecated,而新的API又不全,所以新手学起来感觉很变扭,社区开发者还是建议用hadoop0.20.2的可以沿用老版的API,本着学习的态度,用新的API添加了KeyValueTextInputFormat这个在老版API中存在的类,实际上是对TextInputFormat进行了小幅的修改。
package cn.fnst.cspf.output;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class KeyValueInputFormat extends FileInputFormat<Text, Text>{
protected static class KeyVRecordReader extends RecordReader<Text,Text>{
private static final Log LOG = LogFactory.getLog(KeyVRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private String separator = "\t";
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
this.separator = job.get("key.value.separator.in.input.line", "\t");
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@Override
public synchronized void close() throws IOException {
if(in!=null){
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
Text line = new Text();
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(line, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
//此处添加额外处理即可,其他地方与TextInputFormat一样。
if(null!=line){
String[] kv = line.toString().split(this.separator);
if(kv.length==2){
key.set("key:"+kv[0]);
value.set("value:"+kv[1]);
}else{
LOG.info("Skipped line has no separator");
key.set(line.toString());
value.set("");
}
}
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new KeyVRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}
在看老版的API时,发现旧的KeyValueTextInputFormat的作者基本上都是拿算法自己写,hadoop源码的很多地方都是不会拿现成的api来用,都是自己定义,这样做对性能的可控性是很强,这也折射出国外程序员跟国内程序员的差异,国内提倡拿来主义,国外可能更强调创新精神吧。
分享到:
相关推荐
C# 自定义控件 自定义ComboBox。其他控件的自定义与此类似。
QT自定义窗口 自由拖动 自定义标题
tablayout+viewpager自定义tab和自定义指示器。完美解决滑动卡顿。
用户自定义控件,用户自定义控件,用户自定义控件,用户自定义控件,用户自定义控件,用户自定义控件,用户自定义控件,用户自定义控件,用户自定义控件用户自定义控件,用户自定义控件,用户自定义控件,用户自定义...
springboot工程通过自定义response注解、java反射机制、自定义java拦截器、自定义功能类实现WebMvcConfigurer接口等功能,实现自定义规范化返回数据结构。
vb.net 自定义控件 自定义属性 UITypeEditor UI 类型编辑器 实例 提供一个示例 UITypeEditor,它使用 IWindowsFormsEditorService 显示用于用户输入的 Form。 IWindowsFormsEditorService 只能通过 PropertyGrid ...
自定义按钮自定义按钮自定义按钮自定义按钮自定义按钮
自定义导航模板
1.包含自定义消息 2.包含Panel类型的自定义插件 3.包含Display类型的自定义插件
这是我用C#写的自定义组件和控件,内附一个testForm来演示如何使用。 其中,自定义的组件有:速选组件、窗口靠边停靠组件、窗口抖动组件。 自定义控件有:黑白的菜单栏、带进度显示的进度条等。
自定义文件格式 自定义文件格式 自定义文件格式 自定义文件格式 自定义文件格式
Android自定义中国象棋,原创-------转载注明出处; Android自定义中国象棋,原创-------转载注明出处; Android自定义中国象棋,原创-------转载注明出处; Android自定义中国象棋,原创-------转载注明出处; ...
FreeSwitch完整的自定义模块定义和改善自定义事件的例子,详细介绍见博客:https://blog.csdn.net/xxm524/article/details/126211171
java自定义标签java自定义标签java自定义标签java自定义标签java自定义标签java自定义标签
自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签自定义分页标签...
介绍了mybatis自定义标签,添加自定义标签的实现代码,通过实例代码展示给大家
uni-app自定义弹窗组件指令部分,支持自定义图片,文本、按钮等功能。
sql自定义函数 sql自定义函数 sql自定义函数
JAVA动态表单设计,自定义表单,自定义数据,在线设计,数据库存储
微信小程序源码(含截图)自定义tabbar微信小程序源码(含截图)自定义tabbar微信小程序源码(含截图)自定义tabbar微信小程序源码(含截图)自定义tabbar微信小程序源码(含截图)自定义tabbar微信小程序源码(含...