一、来自 hadoop in action 上的实例,我在这里做了一个总结。文件内容如下:
17:16:20 http://blackproof.iteye.com/blog/1806263 17:16:21 http://blackproof.iteye.com/blog/1806264 17:16:56 http://blackproof.iteye.com/blog/1806265 17:16:30 http://blackproof.iteye.com/blog/1806266 17:16:45 http://blackproof.iteye.com/blog/1806267 17:16:23 http://blackproof.iteye.com/blog/1806268
需求是:把后面的URLString 封装成 URL类型。代码如下:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.io.Writable; public class URLWritable implements Writable { protected URL url; public URLWritable() { } public URLWritable(URL url) { this.url = url; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(url.toString()); } @Override public void readFields(DataInput in) throws IOException { this.url = new URL(in.readUTF()); } public void set(String string) { try { this.url = new URL(string); } catch (MalformedURLException e) { throw new RuntimeException("Should not have happened " + e.toString()); } } }
import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class TimeUrlLineRecordReader extends RecordReader<Text, URLWritable> { public static final String Time_URL_SEPERATOR = "mapreduce.input.keyvaluelinerecordreader.key.value.separator"; private final LineRecordReader lineRecordReader; private byte separator = (byte) '\t'; private Text innerValue; private Text key; private URLWritable value; public static int findSeparator(byte[] utf, int start, int length, byte sep) { for (int i = start; i < (start + length); i++) { if (utf[i] == sep) { return i; } } return -1; } public static void setKeyValue(Text key, URLWritable value, byte[] line, int lineLen, int pos) { if (pos == -1) { key.set(line, 0, lineLen); value.set(StringUtils.EMPTY); } else { key.set(line, 0, pos); String url = null; System.arraycopy(line, pos + 1,url , 0, lineLen - pos - 1); value.set(url); } } public TimeUrlLineRecordReader(Configuration conf) throws IOException { lineRecordReader = new LineRecordReader(); String sepStr = conf.get(Time_URL_SEPERATOR, "\t"); this.separator = (byte) sepStr.charAt(0); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { lineRecordReader.initialize(split, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { byte[] line = null; int lineLen = -1; if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) { return false; } if (key == null) { key = new Text(); } if (value == null) { value = new URLWritable(); } int pos = findSeparator(line, 0, lineLen, this.separator); setKeyValue(key, value, line, lineLen, pos); return true; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public URLWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { lineRecordReader.close(); } }
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable>{ @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec == null; } @Override public RecordReader<Text, URLWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(split.toString()); return new TimeUrlLineRecordReader(context.getConfiguration()); } }
import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CustomTimeUrl extends Configured implements Tool { public static class CustomTimeUrlMapper extends Mapper<Text, URLWritable, Text, URLWritable> { @Override protected void map(Text key, URLWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class CustomTimeUrlReducer extends Reducer<Text, URLWritable, Text, URLWritable> { @Override protected void reduce(Text key, Iterable<URLWritable> values,Context context)throws IOException, InterruptedException { for (URLWritable value : values) { context.write(key, value); } } } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf()); job.setJarByClass(getClass()); job.setJobName("CustomTimeUrl"); job.setInputFormatClass(TimeUrlTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(URLWritable.class); job.setMapperClass(CustomTimeUrlMapper.class); job.setReducerClass(CustomTimeUrlReducer.class); FileInputFormat.setInputPaths(job, new Path("/timeurl/input/")); FileOutputFormat.setOutputPath(job, new Path("/timeurl/output")); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { int result = ToolRunner.run(new TimeUrl(), args); System.exit(result); } }
相关推荐
hadoop自定义类型编程
Hadoop 自定义 Partitioner 实现
Hadoop 自定义 Partitioner 实现
Hadoop 自定义 Partitioner 源代码
通过自定义数据类型,可以使用Hadoop处理单表数据查询以及多表相互关联的应用场景。
hadoop构建数据仓库实践
第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件...
Hadoop平台搭建及实例运行 Hadoop平台搭建及实例运行 Hadoop平台搭建及实例运行
Hadoop构建数据仓库实践1——王雪迎
第二章(Hadoop大数据处理实战)搭建Hadoop分布式集群.pdf第二章(Hadoop大数据处理实战)搭建Hadoop分布式集群.pdf第二章(Hadoop大数据处理实战)搭建Hadoop分布式集群.pdf第二章(Hadoop大数据处理实战)搭建Hadoop分布式...
hadoop应用开发实例教程之Greenplum架构,由北风网提供,hhadoop应用开发实例教程主要介绍什么是Greenplum;Greenplum体系结构;Greenplum高可用性架构。安装Greenplum:配置环境;安装并初始化GPDB系统;启停数据库...
主要介绍了hadoop迁移数据应用实例,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
《Hadoop大数据处理》共10章涉及的主题包括大数据处理概论、基于Hadoop的大数据处理框架、MapReduce计算模式、使用HDFS存储大数据、HBase大数据库、大数据的分析处理、Hadoop环境下的数据整合、Hadoop集群的管理与...
Ubuntu系统上Hadoop与MapReduce 运行实例
Hadoop大数据处理实战
hadoop 实例
Hadoop气象数据Hadoop气象数据Hadoop气象数据Hadoop气象数据Hadoop气象数据
包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作步骤。学习完此例子后,你能掌握MapReduce基础编程,及如何编译Java文件,打包jar文件,编写shell执行脚本等。后续学习还可以参看本人的...
第五章(Hadoop大数据处理实战)Hadoop的IO操作.pdf第五章(Hadoop大数据处理实战)Hadoop的IO操作.pdf第五章(Hadoop大数据处理实战)Hadoop的IO操作.pdf第五章(Hadoop大数据处理实战)Hadoop的IO操作.pdf第五章(Hadoop大...