`
Yinny
  • 浏览: 292531 次
  • 性别: Icon_minigender_2
  • 来自: 杭州
社区版块
存档分类
最新评论

搜索切换dump之MapReduce讲解

阅读更多
分享聚合dump的是评价的数据库,由于数据量超大且经常超时所以进行了数据源的切换,即从数据库dump切换为云梯dump,整个工作就是由一个mr的job去云梯读取数据然后进行一系列转化最后输出数据到文件的过程。

对于MapReduce编程模型,网上有很多文章,自己去学习哈,在这里不再赘述啦~,此处就这次mr的代码进行讲解。
MR的核心一共三个类,Mapper,Reduce和Submitter,其余的都是做一些数据转换的类。
首先来看Mapper类:
Map类中map方法的分析:

public class FullIndexMatrixtaodanMapper extends MapReduceBase implements Mapper<Writable, Text, LongWritable, IndexMatrixtaodanWriter> {

	private final LongWritable auctionId = new LongWritable(0);

	private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	
	@Override
	public void map(Writable key, Text value, OutputCollector<LongWritable, IndexMatrixtaodanWriter> output, Reporter reporter) throws IOException {
		String line = value.toString();
		// 评价表
		String[] records = this.splitRecords(line);
		long userId = NumberUtils.toLong(records[1], 0l);
		if (userId == 0l) {
			return;
		}
		auctionId.set(userId);
		IndexMatrixtaodanWriter record = this.buildFeedToTaodanWriter(records);
		if (null != record) {
			output.collect(auctionId, record);
		}

	}
}

Map类继承自MapReduceBase,并且它实现了Mapper接口,此接口是一个规范类型,它有4种形式的参数,分别用来指定map的输入key值类型、输入value值类型、输出key值类型和输出value值类型。mapper的输入数据类型由InputFormat控制,默认的输入格式是TextInputFormat,它会以(LongWritable,Text)键值对的方式加载数据。long值表示某一行在文件中的偏移量,Text对象则保存某一行的字符串内容。在本例中map的输入类型为<LongWritable,Text>,(其实map的输入key没啥作用,它表示在一行中的偏移量),在这个日常中需要输出<userId,淘单对象>这样的形式,因此输出的key值类型是LongWritable,输出的value值类型是IndexMatrixtaodanWriter(输出的淘单对象)。

  实现此接口类还需要实现map方法,map方法会具体负责对输入进行操作,在本例中,map方法对输入的行以空格为单位进行切分,然后使用OutputCollect收集输出的<userId,IndexMatrixtaodanWriter>,OutputCollector 由 Hadoop 框架提供, 负责收集 Mapper 和 Reducer 的输出数据,实现 map 函数和 reduce 函数时,只需要简单地将其输出的 <key,value> 对往 OutputCollector 中一丢即可,剩余的事框架自会帮你处理好。


Reduce中reduce方法的分析:

public class FullIndexMatrixtaodanReduce extends MapReduceBase implements Reducer<LongWritable, IndexMatrixtaodanWriter, Text, Text> {

	private final Text outContent = new Text();

	@Override
	public void reduce(LongWritable key, Iterator<IndexMatrixtaodanWriter> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
		String rowIndex = null;

		while (values.hasNext()) {
			IndexMatrixtaodanWriter taodanWriter = values.next();
			MatrixtaodanSearchSchemaDo taodanSchemaDo = adaptoTaodanSchemaDO(taodanWriter);
            try {
                rowIndex = taodanSchemaDo.unmarshall();
                outContent.set(rowIndex);
                output.collect(null, outContent);
            } catch (MarshallRateException e) {
            }
        }

	}
}

Reduce类也是继承自MapReduceBase的,需要实现Reducer接口。Reduce类以map的输出作为输入,因此Reduce的输入类型是<userId,IndexMatrixtaodanWriter>。而Reduce的输出用户id和对应的淘单对象,因此,它的输出类型是<LongWritable,Text>。Reduce类也要实现reduce方法,在此方法中,reduce函数将输入的key值作为输出的key值,然后将获得多个value值加起来,作为输出的值。

提交任务,运行:
public class FullIndexTaodanJobSubmitter {

	public static void main(String[] args) throws Exception {

		String jobName = JOB_NAME_FULLINDEX_MATRIXTAODAN;

		// 任务结束后,时间文件路径放在数据文件路径中,文件名为tsearcher.ok,内容为yyyyMMddHHmmss

		JobConf conf = new JobConf(FullIndexTaodanJobSubmitter.class);
		conf.setJobName(jobName);
		// 中间结果输出格式 默认和output一致
		conf.setMapOutputKeyClass(LongWritable.class);
		conf.setMapOutputValueClass(IndexMatrixtaodanWriter.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
		conf.setReducerClass(FullIndexMatrixtaodanReduce.class);

		// 重新分配reduce任务数 90
		conf.setNumReduceTasks(FULL_INDEX_REDUCE_COUNT);
		// reduce 最后输出结果格式。
		conf.setOutputFormat(TextOutputFormat.class);
		// 评价数据源mapper
		MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FullIndexMatrixtaodanMapper.class);

		FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		JobClient.runJob(conf);

	}

}



main方法是整个mr任务的入口,在 Hadoop 可以通过一个 JobConf 对象设置如何运行这个 job,此处定义了job的名字,map输出的 key 的类型是 LongWritable, value 的类型是 IndexMatrixtaodanWriter, 指定FullIndexMatrixtaodanReduce作为 Reducer 类, 任务的输入路径和输出路径由命令行参数指定,这样 job 运行时会处理输入路径下的所有文件,并将计算结果写到输出路径下。
然后将 JobConf 对象作为参数,调用 JobClient 的 runJob, 开始执行这个计算任务。

如果作业中涉及到对不同的数据集进行连接(join),即定义多个不同的数据输入源时,可用MultipleInputs.addInputPath(JobConf conf, InputPath, TextInputFormat.class, Mapper.class) 这个方法原理是多个数据源就采用多个map来处理,以上任务中只涉及一个数据源,没有多表join,故可用 FileInputFormat.addInputPath(conf, new Path(inputPath));
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics