- 浏览: 187882 次
文章分类
最新评论
自定义InputFormat 新版API 把真个文件当成一条输入 主要参考 源代码LineRecordReader里面的内容 有些细节还没有理解 WholeFileInputFormat import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { @Override public RecordReader<Text, BytesWritable> createRecordReader( InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new WholeFileRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } } WholeFileRecordReader import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { private FileSplit fileSplit; private FSDataInputStream fis; private Text key = null; private BytesWritable value = null; private boolean processed = false; @Override public void close() throws IOException { // TODO Auto-generated method stub //fis.close(); } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.key; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.value; } @Override public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext) throws IOException, InterruptedException { fileSplit = (FileSplit) inputSplit; Configuration job = tacontext.getConfiguration(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(job); fis = fs.open(file); } @Override public boolean nextKeyValue() { if(key == null){ key = new Text(); } if(value == null){ value = new BytesWritable(); } if(!processed){ byte[] content = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); System.out.println(file.getName()); key.set(file.getName()); try { IOUtils.readFully(fis, content, 0, content.length); //value.set(content, 0, content.length); value.set(new BytesWritable(content)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ IOUtils.closeStream(fis); } processed = true; return true; } return false; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return processed? fileSplit.getLength():0; } } 验证 public static class mapper extends Mapper<Text, BytesWritable, Text, Text>{ @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(key, new Text(value.getBytes())); } } note:value是BytesWritable类型,显示为十六进制数, new Text(value.getBytes()) 变成字符串形式
发表评论
-
多表join的一个优化思路
2012-11-20 11:24 1419big table:streamed small table: ... -
好的网站
2012-09-20 22:17 7481. http://www.cnblogs.com/luche ... -
Hadoop 任务流程
2012-09-07 16:18 785简单的来说分为四个阶段:InputFormat/MapTask ... -
Hadoop关于最大map reducer数目
2012-08-14 20:53 929mapred-site.xml文件: <prop ... -
java.io.IOException:Typemismatch in key from map:expected org.apache.hadoop.io
2012-08-14 20:53 1421解决办法: jo ... -
HDFS 输入文件避免切分
2012-08-14 20:52 1088自定义InputFormat的子类,并把重载方法 ... -
Hadoop 开启debug信息
2012-08-14 20:51 3959运行hadoop程序时,有时候你会使用一些System. ... -
Hadoop 关于0.95/1.75 * (number of nodes)误解
2012-08-14 20:51 943reduce任务槽,即集群能够同时运行的redu ... -
MapReduce ReadingList
2012-08-09 12:22 6681. http://www.aicit.org/jcit/gl ... -
"hadoop fs 和hadoop dfs的区别"
2012-05-30 15:27 1880粗略的讲,fs是个比较抽象的层面,在分布式环境中,fs就是df ... -
Hadoop 自动清除日志
2012-05-29 18:02 896hadoop集群跑了很多的任务后 在hadoop.log ... -
DistributedCache FileNotFoundException
2012-05-26 18:02 944此时注意两种文件路径表示形式,一个在HDFS中。一一个是本地文 ... -
Cygwin 不支持native lib 不支持使用native lib 提供的压缩
2012-05-25 13:33 1109弄了一个上午hadoop的压缩,一直报错NullPointer ... -
Hadoop 在Window下搭建 守护进程启动问题
2012-05-23 15:27 781hadoop version “0.20.2” java ... -
Cygwin ssh Connection closed by ::1
2012-05-17 21:09 1109在Win7下Cygwin中,使用sshlocalhost命令, ... -
Eclipse:Run on Hadoop 没有反应
2012-05-10 20:11 850hadoop-0.20.2下自带的eclise插件没有用,需要 ... -
Hadoop SequcenceFile 处理多个小文件
2012-04-29 11:04 3839利用sequenceFile打包多个小文件,MapFile是s ... -
Hadoop 自定义计数器
2012-04-22 09:04 1455public static class mapper e ... -
MapReduce : Combiner的使用(以平均数为例) 并结合in-mapper design pattern 实例
2012-04-10 18:51 4294没有使用Combiner 和 in-mapper des ... -
Hadoop NameNode backup
2012-03-24 18:12 822NameNode: <property> ...
相关推荐
自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。
这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686
MapReduce: Simplified Data Processing on Large Clusters from google.
来自于GOOGLE的mapreduce的开山之作,此文是原英文的中文版本,希望能互相参照,加深理解
用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduce函数. 用户自定义的reduce函数,接受一个中间key I和相关的一...
Google并行计算,分布式处理模型MapReduce: Simplified Data Processing on Large Clusters
MapReduce: Simplified Data Processing on Large Clusters翻译
mapreduce创建代码项目mvn原型:generate -DarchetypeGroupId = org.apache.maven.archetypes -DgroupId = org.conan.mymahout -DartifactId = myPro -DpackageName = org.conan.mymahout -Dversion = 1.0-SNAPSHOT ...
MapReduce:Nkeys,Nfiles终极解决方案.docx
Google的MapReduce并行计算原始论文详解。
一篇文章是Google的Jeffrey Dean、Sanjay Ghemawat发表的标题为《MapReduce:一个灵活的数据库处理工具》,另一篇文章是Michael Stonebraker、Daniel Abadi、David J. DeWitt、Sam Madden、Erik Paulson、Andrew ...
MapReduce的翻译,我只是个搬运工qwq
实现mr的wordcount功能和自定义分区的功能、自定义排序功能;com.ellis.mr1为类似wc功能,com.ellis.mr2为自定义分区功能,com.ellis.mr3为自定义排序功能
MapReduce之自定义 OutPutFormat,通过一个案例,实现自定义的一个OutPutFormat,来更加的深刻的理解MR的过程
Google那篇著名的论文的ppt,MapReduce开山之作,介绍了Google对MapReduce的实现。
对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并, 并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例 供参考。 输入文件 A 的样例如下:
使用MultiLineJsonInputFormat类作为您的 Mapper InputFormat 假设您有一些类似于这样的 JSON: {"menu": { "header": "SVG Viewer", "items": [ {"id": "Open"}, {"id": "OpenNew", "label": "Open New"}, ...
NULL 博文链接:https://langyu.iteye.com/blog/962529
该 repo 包含自定义 I/O 格式、文件格式、日志处理、ipLookup、二级排序和自定义 patitioner 的集合。 打包:从项目的根运行: mvn package该软件包包含以下程序的列表: 1.平均字数程序使用map输出记录计数器来实现...
MapReduce原始论文