- 浏览: 274806 次
文章分类
最新评论
-
feargod:
...
ActivityGroup的子activity响应back事件的顺序问题 -
hoarhoar:
谢谢你,终于解决了,我真是受够了,总是45秒钟,真是疯了。
youku 的广告必须要屏蔽 -
lilai:
...
youku 的广告必须要屏蔽 -
aijuans2:
...
youku 的广告必须要屏蔽 -
weiwo1978:
说的非常好,mark
SELECT语句执行的顺序
package org.myorg; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /**摘{@link TextOutputFormat}碌ineRecordWriter隆拢 */ public class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
public static class myOutput extends MultipleOutputFormat<Text, Text> { @Override protected String generateFileNameForKeyValue(Text key, Text value, TaskAttemptContext taskID) { String mykey = key.toString(); String myValue = value.toString(); String tasknum = taskID.getTaskAttemptID().getTaskID().toString(); String fileNum = tasknum.substring(tasknum.length()-3); String newname = mykey.substring(0,3); return fileNum+newname; } }
目的:
根据输出数据的某些特征,分类输出到不同的文件夹下以便管理,而不是放在同一个文件夹下。
实现:
1、重写MultipleOutputFormat的某些方法,参考org.apache.hadoop.mapred.lib.MultipleOutputFormat,需要在程序中实现的子类方法是:
protected String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job),即通过key和value及conf配置信息决定文件名
(含扩展名)。其中,需要改写的方法是最后一个方法:private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job,
String baseName),baseName 即为 在程序中重写的子类 generateFileNameForKeyValue 的返回值。
代码:
package org.myorg; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class MultipleOutputFormat<K extends WritableComparable, V extends Writable> extends FileOutputFormat<K, V> { private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } protected abstract String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job);//Configuration conf); public class MultiRecordWriter extends RecordWriter<K, V> { private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { String baseName = generateFileNameForKeyValue(key, value, job);//job.getConfiguration()); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } //LongWritable keys=(LongWritable)key; //long ret=keys.get()>>1; //keys.set(ret); rw.write(key, value);//change } private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = "\t"; //change String pathname=baseName.substring(12); //change RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); //String pathname=baseName.substring(12); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11) + codec.getDefaultExtension()); //change FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11)); //change FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }
2、把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。
代码如下:
3、在主程序中加载generateFileNameForKeyValue方法:
在main函数中需添加 job.setOutputFormatClass(myOutput.class);
更多信息请查看 java进阶网 http://www.javady.com
发表评论
-
hadoop FSNamesystem中的recentInvalidateSets
2012-04-20 20:28 964今天早就回来了,然后偷懒了2个小时,现在才开始分析代码, ... -
hadoop namenode后台jetty web
2012-04-20 20:28 1648现在开始分析namenode启动时开启的第2类线程, ... -
hadoop namenode format做了什么?
2012-04-18 20:58 1055一看到format就和磁盘格式化联想到一起,然后这个fo ... -
hadoop分布式配置(服务器系统为centos5,配置时使用的用户是root)
2012-04-14 21:19 1004目前我们使 ... -
Hadoop 安装问题和解决方案
2012-04-10 13:21 1182前几天在Window和Linux主机安装了Hadoop, ... -
运行Hadoop遇到的问题
2012-04-10 13:19 1523运行Hadoop遇到的问题 1, 伪分布式模式 ... -
运行Hadoop遇到的问题
2012-04-10 13:19 0运行Hadoop遇到的问题 1, 伪分布式模式 ... -
hadoop使用过程中的一些小技巧
2012-04-09 10:16 1099hadoop使用过程中的一些小技巧 ------------- ... -
运行hadoop时的一些技巧
2012-04-09 10:14 726//用来给key分区的,需要实现Partitioner接口 ... -
hive相关操作文档收集
2012-04-08 10:51 0How to load data into Hive ... -
hive sql doc
2012-04-08 10:51 0记录2个常用的hive sql语法查询地 官方 ht ... -
hive Required table missing : "`DBS`" in Catalog "" Schema "
2012-04-08 10:51 0最近需要提取一些数据,故开始使用hive,本机搭建了一个hiv ... -
HDFS数据兼容拷贝
2012-04-08 10:50 0系统中使用了hadoop 19.2 20.2 2个版本,为啥有 ... -
hdfs 简单的api 读写文件
2012-04-08 10:50 0Java代码 import ... -
hbase之htable线程安全性
2012-04-22 15:22 1102在单线程环境下使用hbase的htable是没有问题,但是突然 ... -
hbase之scan的rowkey问题
2012-04-22 15:22 1686最近使用到hbase做存储,发现使用scan的时候,返回的ro ... -
datanode启动开启了那些任务线程
2012-04-22 15:22 1024今天开始分析datanode,首先看看datanode开启了哪 ... -
namenode这个类的主要功能
2012-04-22 15:22 1405今天来总看下namenode这个类的主要功能 首先看下这个类 ... -
hadoop监控
2012-04-22 15:21 1560通过从hadoop的 hadoop-metrics文件中就可以 ... -
zookeeper集群配置注意项
2012-04-21 21:32 1088项目中需要使用hbase,故准备在本机搭建hbase,考虑到h ...
相关推荐
hadoop2.7汇总:新增功能最新编译64位安装、源码包、API、eclipse插件下载
Hadoop技术内幕:深入解析YARN架构设计与实现原理 高清完整中文版PDF下载
Hadoop 技术内幕:深入解析Hadoop Common 和HDFS 架构设计与实现原理
Hadoop技术内幕:深入解析YARN架构设计与实现原理.pdf
Hadoop技术内幕:深入解析Hadoop Common 和HDFS 架构设计与实现原理 (大数据技术丛书) 原版书籍,非扫描版,使用kindle可以打开,也可以转换为epub使用ibooks打开
Hadoop硬实战:Hadoop in Practice
Hadoop技术内幕:深入解析YARN架构设计与实现原理.pdf
这个是课本上的项目,很早以前做的,数据文件(没放在代码里)得自己下载,你们得在自己电脑上做修改才能不报错,至于怎么修改,我也不知道。或许是改ip地址,版本信息,数据库配置之类的......这个主要是一个参考...
《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》还从源代码实现中对分布式技术的精髓、分布式系统设计的优秀思想和方法,以及Java语言的编码技巧、编程规范和对设计模式的精妙运用进行了总结和...
《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》由腾讯数据平台的资深Hadoop专家、X-RIME的作者亲自执笔,对Common和HDFS的源代码进行了分析,旨在为Hadoop的优化、定制和扩展提供原理性的指导。...
最后从实际应用的角度深入讲解了Hadoop的性能优化、安全机制、多用户作业调度器和下一代MapReduce框架等高级主题和内容。《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》适合Hadoop的二次开发人员、应用...
Hadoop的MapReduce中多文件输出.pdf
Hadoop技术内幕:深入解析MapReduce架构设计i与实现原理Hadoop技术内幕:深入解析MapReduce架构设计i与实现原理Hadoop技术内幕:深入解析MapReduce架构设计i与实现原理Hadoop技术内幕:深入解析MapReduce架构设计i与...
Hadoop和Kerberos简介。
Apache Hadoop十周岁:展望前方.pdf
Hadoop技术内幕:深入解析MapReduce架构设计与实现原理 文字版Hadoop技术内幕:深入解析MapReduce架构设计与实现原理 文字版Hadoop技术内幕:深入解析MapReduce架构设计与实现原理 文字版
Hadoop开发基础 : Google三大论文: MapReduce超大机群上的简单数据处理.doc Hadoop开发基础 : Google三大论文: MapReduce超大机群上的简单数据处理.doc Hadoop开发基础 : Google三大论文: MapReduce超大机群上的简单...
Hadoop技术内幕:深入解析YARN架构设计与实现原理 Hadoop技术内幕:深入解析YARN架构设计与实现原理
Hadoop技术内幕:深入解析YARN架构设计与实现原理(扫描版)Hadoop技术内幕:深入解析YARN架构设计与实现原理(扫描版)Hadoop技术内幕:深入解析YARN架构设计与实现原理(扫描版)