`
yehao0716
  • 浏览: 22092 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

使用MapReduce对数据文件进行切分

阅读更多

 

有一个格式化的数据文件,用\t分割列,第2列为产品名称。现在需求把数据文件根据产品名切分为多个文件,使用MapReduce程序要如何实现?

原始文件:

[root@localhost opt]# cat aprData

1       a1      a111

2       a2      a211

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

7       a2      a112

8       a2      a112

9       a2      a112

10      a3      a113

 

思路:

1.用一个mapreduce程序找出所有产品名称:

1.1map<k2,v2>为<产品名称,null>

1.2reduce<k3,v3>为<产品名称,null>

   实现:AprProduces类

[root@localhost opt]# hadoop jar apr-produces.jar /aprData /aprProduce-output

Warning: $HADOOP_HOME is deprecated.

 

16/05/01 15:00:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

16/05/01 15:00:12 INFO input.FileInputFormat: Total input paths to process : 1

16/05/01 15:00:12 INFO util.NativeCodeLoader: Loaded the native-hadoop library

16/05/01 15:00:12 WARN snappy.LoadSnappy: Snappy native library not loaded

16/05/01 15:00:13 INFO mapred.JobClient: Running job: job_201605010048_0020

16/05/01 15:00:14 INFO mapred.JobClient:  map 0% reduce 0%

16/05/01 15:00:33 INFO mapred.JobClient:  map 100% reduce 0%

16/05/01 15:00:45 INFO mapred.JobClient:  map 100% reduce 100%

16/05/01 15:00:50 INFO mapred.JobClient: Job complete: job_201605010048_0020

16/05/01 15:00:50 INFO mapred.JobClient: Counters: 29

16/05/01 15:00:50 INFO mapred.JobClient:   Map-Reduce Framework

16/05/01 15:00:50 INFO mapred.JobClient:     Spilled Records=20

16/05/01 15:00:50 INFO mapred.JobClient:     Map output materialized bytes=56

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce input records=10

16/05/01 15:00:50 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3868389376

16/05/01 15:00:50 INFO mapred.JobClient:     Map input records=10

16/05/01 15:00:50 INFO mapred.JobClient:     SPLIT_RAW_BYTES=89

16/05/01 15:00:50 INFO mapred.JobClient:     Map output bytes=30

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce shuffle bytes=56

16/05/01 15:00:50 INFO mapred.JobClient:     Physical memory (bytes) snapshot=240697344

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce input groups=3

16/05/01 15:00:50 INFO mapred.JobClient:     Combine output records=0

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce output records=3

16/05/01 15:00:50 INFO mapred.JobClient:     Map output records=10

16/05/01 15:00:50 INFO mapred.JobClient:     Combine input records=0

16/05/01 15:00:50 INFO mapred.JobClient:     CPU time spent (ms)=1490

16/05/01 15:00:50 INFO mapred.JobClient:     Total committed heap usage (bytes)=177016832

16/05/01 15:00:50 INFO mapred.JobClient:   File Input Format Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Bytes Read=101

16/05/01 15:00:50 INFO mapred.JobClient:   FileSystemCounters

16/05/01 15:00:50 INFO mapred.JobClient:     HDFS_BYTES_READ=190

16/05/01 15:00:50 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43049

16/05/01 15:00:50 INFO mapred.JobClient:     FILE_BYTES_READ=56

16/05/01 15:00:50 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9

16/05/01 15:00:50 INFO mapred.JobClient:   Job Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Launched map tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:     Launched reduce tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=11002

16/05/01 15:00:50 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

16/05/01 15:00:50 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=13561

16/05/01 15:00:50 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0

16/05/01 15:00:50 INFO mapred.JobClient:     Data-local map tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:   File Output Format Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Bytes Written=9

[root@localhost opt]# hadoop fs -cat /aprProduce-output/part-r-00000

Warning: $HADOOP_HOME is deprecated.

 

a1

a2

a3

 

   

2.再用一个mapreduce程序对文件进行切分:

2.1map<k2,v2>为<产品名称,line>

2.2reduce<k3,v3>为<line,null>

2.3自定义分区partition,读取第一个mapreduce程序的输出文件,组装成一个map<产品名称,index>,在partition中判断产品名称并返回下标,没有找到放在0下标中。

2.4设置taskNum(reduce的个数),taskNum应该和partition的个数一致.

3.5使用MultipleOutPuts类进行重命名输出文件,输出文件为 xxx-00001 等

实现:AprClassify类

 

[root@localhost opt]# hadoop jar apr-classify.jar /aprData /apr-output

Warning: $HADOOP_HOME is deprecated.

 

16/05/01 14:09:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

16/05/01 14:09:11 INFO input.FileInputFormat: Total input paths to process : 1

16/05/01 14:09:11 INFO util.NativeCodeLoader: Loaded the native-hadoop library

16/05/01 14:09:11 WARN snappy.LoadSnappy: Snappy native library not loaded

16/05/01 14:09:11 INFO mapred.JobClient: Running job: job_201605010048_0017

16/05/01 14:09:13 INFO mapred.JobClient:  map 0% reduce 0%

16/05/01 14:09:29 INFO mapred.JobClient:  map 100% reduce 0%

16/05/01 14:09:41 INFO mapred.JobClient:  map 100% reduce 33%

16/05/01 14:09:44 INFO mapred.JobClient:  map 100% reduce 66%

16/05/01 14:09:56 INFO mapred.JobClient:  map 100% reduce 100%

16/05/01 14:10:01 INFO mapred.JobClient: Job complete: job_201605010048_0017

16/05/01 14:10:01 INFO mapred.JobClient: Counters: 29

16/05/01 14:10:01 INFO mapred.JobClient:   Map-Reduce Framework

16/05/01 14:10:01 INFO mapred.JobClient:     Spilled Records=20

16/05/01 14:10:01 INFO mapred.JobClient:     Map output materialized bytes=169

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce input records=10

16/05/01 14:10:01 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=7754653696

16/05/01 14:10:01 INFO mapred.JobClient:     Map input records=10

16/05/01 14:10:01 INFO mapred.JobClient:     SPLIT_RAW_BYTES=89

16/05/01 14:10:01 INFO mapred.JobClient:     Map output bytes=131

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce shuffle bytes=169

16/05/01 14:10:01 INFO mapred.JobClient:     Physical memory (bytes) snapshot=387825664

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce input groups=3

16/05/01 14:10:01 INFO mapred.JobClient:     Combine output records=0

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce output records=0

16/05/01 14:10:01 INFO mapred.JobClient:     Map output records=10

16/05/01 14:10:01 INFO mapred.JobClient:     Combine input records=0

16/05/01 14:10:01 INFO mapred.JobClient:     CPU time spent (ms)=3950

16/05/01 14:10:01 INFO mapred.JobClient:     Total committed heap usage (bytes)=209522688

16/05/01 14:10:01 INFO mapred.JobClient:   File Input Format Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Bytes Read=101

16/05/01 14:10:01 INFO mapred.JobClient:   FileSystemCounters

16/05/01 14:10:01 INFO mapred.JobClient:     HDFS_BYTES_READ=199

16/05/01 14:10:01 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=86609

16/05/01 14:10:01 INFO mapred.JobClient:     FILE_BYTES_READ=169

16/05/01 14:10:01 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=104

16/05/01 14:10:01 INFO mapred.JobClient:   Job Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Launched map tasks=1

16/05/01 14:10:01 INFO mapred.JobClient:     Launched reduce tasks=3

16/05/01 14:10:01 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=35295

16/05/01 14:10:01 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

16/05/01 14:10:01 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=13681

16/05/01 14:10:01 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0

16/05/01 14:10:01 INFO mapred.JobClient:     Data-local map tasks=1

16/05/01 14:10:01 INFO mapred.JobClient:   File Output Format Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Bytes Written=0

[root@localhost opt]# hadoop fs -ls /apr-output/

Warning: $HADOOP_HOME is deprecated.

 

Found 8 items

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/_SUCCESS

drwxr-xr-x   - root supergroup          0 2016-05-01 14:09 /apr-output/_logs

-rw-r--r--   1 root supergroup         51 2016-05-01 14:09 /apr-output/a1-r-00000

-rw-r--r--   1 root supergroup         41 2016-05-01 14:09 /apr-output/a2-r-00001

-rw-r--r--   1 root supergroup         12 2016-05-01 14:09 /apr-output/a3-r-00002

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00000

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00001

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00002

[root@localhost opt]# hadoop fs -cat /apr-output/a1-r-00000

Warning: $HADOOP_HOME is deprecated.

 

1       a1      a111

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

 

[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00000

Warning: $HADOOP_HOME is deprecated.

 

cat: File does not exist: /apr-output/a2-r-00000

[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00001

Warning: $HADOOP_HOME is deprecated.

 

2       a2      a211

7       a2      a112

8       a2      a112

9       a2      a112

 

[root@localhost opt]# hadoop fs -cat /apr-output/a3-r-00002

Warning: $HADOOP_HOME is deprecated.

 

10      a3      a113

 

 

 

3.用hdfs对文件进行批量复制,重命名并转移产品数据文件到指定目录

实现:RenameApr类

 

[root@localhost opt]# hadoop fs -ls /aprProduces

Warning: $HADOOP_HOME is deprecated.

 

Found 3 items

-rw-r--r--   3 yehao supergroup         51 2016-05-01 14:37 /aprProduces/a1

-rw-r--r--   3 yehao supergroup         41 2016-05-01 14:37 /aprProduces/a2

-rw-r--r--   3 yehao supergroup         12 2016-05-01 14:37 /aprProduces/a3

[root@localhost opt]# hadoop fs -cat /aprProduces/a1

Warning: $HADOOP_HOME is deprecated.

 

1       a1      a111

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

 

[root@localhost opt]# hadoop fs -cat /aprProduces/a2

Warning: $HADOOP_HOME is deprecated.

 

2       a2      a211

7       a2      a112

8       a2      a112

9       a2      a112

 

[root@localhost opt]# hadoop fs -cat /aprProduces/a3

Warning: $HADOOP_HOME is deprecated.

 

10      a3      a113

 

代码部分:

1.com.huawei.AprClassify

 

package com;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;


public class AprClassify {
	private static int taskNum = HdfsUtils.getMapSize();
	
	public static void main(String[] args)  throws Exception {
		Job job = new Job(new Configuration(), AprClassify.class.getSimpleName());
		job.setJarByClass(AprClassify.class);
		
		job.setMapperClass(AprClassifyMap.class);
		job.setReducerClass(AprClassifyReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		job.setPartitionerClass(AprClassifyPartitioner.class);
		job.setNumReduceTasks(taskNum+1);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}
}


class AprClassifyReducer extends Reducer<Text, Text, Text, NullWritable>{
	private MultipleOutputs<Text, NullWritable> outputs; 
	
	protected void setup(Context context) throws IOException, InterruptedException {  
		outputs = new MultipleOutputs<Text, NullWritable>(context);  
	}
	
	@Override
	protected void reduce(Text k2, Iterable<Text> v2s,
			Reducer<Text, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		String st = "";
		for (Text text : v2s) {
			st += text.toString() +"\n";
		}

		Text k3 = new Text(st);
		outputs.write(k3, NullWritable.get(), k2.toString());
	}
	
	protected void cleanup(Context context) throws IOException,  
	    InterruptedException {  
		outputs.close();  
	}
}

class AprClassifyMap extends Mapper<LongWritable, Text, Text, Text>{
	Text k2 = new Text();
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		String line = value.toString();
		String[] splited = line.split("\t");
		k2.set(splited[1]);
		System.out.println(context);
		System.out.println(k2);
		System.out.println(value);
		context.write(k2, value);
	}
}

class AprClassifyPartitioner extends Partitioner<Text, Text> {

	private static Map<String, Integer> map = HdfsUtils.getMap();
	@Override
	public int getPartition(Text key, Text value, int numPartitions) {
		if(map.get(key.toString()) == null){
			return 0;
		}
		return map.get(key.toString());
	}
}

 

 

2.com.huawei.HdfsUtils

package com.huawei;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HdfsUtils {
	
	private static FileSystem fileSystem;
	private static Map<String, Integer> map;
	
	private static FileSystem getFileSystem() throws URISyntaxException, IOException {
		if(fileSystem == null){
			Configuration conf = new Configuration();
			URI uri = new URI("hdfs://192.168.1.190:9000");
			fileSystem = FileSystem.get(uri, conf);
		}
		
		return fileSystem;
	}
	
	public static int getMapSize(){
		return getMap().size();
	}
	
	public static Map<String, Integer> getMap(){
		if(map == null){
			map = new HashMap<String, Integer>();
			FSDataInputStream in;
			BufferedReader reader = null;
			try{
				fileSystem = getFileSystem();
				in = fileSystem.open(new Path("hdfs://192.168.1.190:9000/aprProduce"));
				reader = new BufferedReader(new InputStreamReader(in));
				String line = null;
				int i = 1;
				while((line = reader.readLine()) != null) {
					map.put(line, i++);
				}
			}catch(Exception e){
				 e.printStackTrace();
			}finally{
				try {
				      if(reader != null) reader.close();
				 } catch (IOException e) {
				      e.printStackTrace();
				 }
			}
		}
		
		return map;
	}
	
	public static void copyProduces(String inputPath, String outPutDir)  throws Exception{
		FileStatus[] listStatus = getFileSystem().listStatus(new Path(inputPath));
		for (FileStatus fileStatus : listStatus) {
			String name = fileStatus.getPath().getName();
			if(!fileStatus.isDir() && !StringUtils.equals(name, "_SUCCESS") && !StringUtils.startsWith(name, "part-r-")){
				FSDataInputStream openStream = fileSystem.open(fileStatus.getPath());
				IOUtils.copyBytes(openStream, fileSystem.create(new Path("/"+outPutDir+"/"+name.split("-")[0])), 1024, false);
				IOUtils.closeStream(openStream);
			}
		}
	}
}

 

3.com.huawei.AprProduces

package com.huawei;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 分析文件,获得所有产品名
 * args[0] 原始文件
 * args[1] 输出文件:所有产品名
 *
 */
public class AprProduces {

	public static void main(String[] args) throws Exception {
		Job job = new Job(new Configuration(), AprProduces.class.getSimpleName());
		job.setJarByClass(AprProduces.class);
		
		job.setMapperClass(AprProducesMap.class);
		job.setReducerClass(AprProducesReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}

}


class AprProducesMap extends Mapper<LongWritable, Text, Text, NullWritable>{
	Text k2 = new Text();
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		
		String line = value.toString();
		String[] splited = line.split("\t");
		k2.set(splited[1]);//四个文件的 文件名的下标不一样,需要修改
		context.write(k2, NullWritable.get());
	}
}

class AprProducesReducer extends Reducer<Text, Text, Text, NullWritable>{
	@Override
	protected void reduce(Text k2, Iterable<Text> v2s,
			Reducer<Text, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(k2, NullWritable.get());
	}
}

 

4.com.huawei.RenameApr

package com.huawei;

public class RenameApr {
	public static void main(String[] args) throws Exception{
		//文件重命名
		HdfsUtils.copyProduces("/apr-output/", "aprProduce");
	}
}

 

0
1
分享到:
评论

相关推荐

    MapReduce详解包括配置文件

    在MapReduce刚开始的时候,会先对文件进行切片(Split)处理。需要注意的是,切片本身是一种逻辑切分而不是物理切分,本质上就是在划分任务量,之后每一个切片会交给一个单独的MapTask来进行处理。默认情况下,Split和...

    第七章-《大数据导论》大数据处理平台.pdf

    算模型和框架,负责计算 HDFS HDFS: Hadoop Distributed File System 构建于本地文件系统之上,例如:ext3, xfs等 特点:多备份、一次写入(不允许修改) MapReduce 基本思想: 分而治之: 数据被切分成许多独立分片...

    Hadoop权威指南(中文版)2015上传.rar

    对数据进行排序 组合和分割数据 Pig实战 并行处理 参数代换 第12章 Hive 1.1 安装Hive 1.1.1 Hive外壳环境 1.2 示例 1.3 运行Hive 1.3.1 配置Hive 1.3.2 Hive服务 1.3.3 Metastore 1.4 和传统数据库进行比较 1.4.1 ...

    Hadoop权威指南 第二版(中文版)

     对数据进行排序  组合和分割数据  Pig实战  并行处理  参数代换 第12章 Hive  1.1 安装Hive  1.1.1 Hive外壳环境  1.2 示例  1.3 运行Hive  1.3.1 配置Hive  1.3.2 Hive服务  1.3.3 Metastore  1.4 和...

    论文研究-并行化语音识别系统的研究与设计.pdf

    针对上述问题,提出了一种基于Hadoop的语音识别系统,借助其分布式文件系统HDFS与MapReduce并行算法解决文件片段传输与并行调度控制的问题,同时引入静音检测算法合理地处理文件切分,通过实验验证了该系统的有效性...

    CDH大数据环境优化.docx

    文件以块为单位进行切分存储,块通常设置的比较大(最小6M,默认128M),根据网络带宽计算最佳值。 块越大,寻址越快,读取效率越高,但同时由于MapReduce任务也是以块为最小单位来处理,所以太大的块不利于于对...

    MongoDB权威指南(中文版)高清

    1168.4 备份和修复 1168.4.1 数据文件备份 1178.4.2 mongodump和mongorestore 1178.4.3 fsync和锁 1188.4.4 从属备份 1198.4.5 修复 119第9章 复制 1219.1 主从复制 1219.1.1 选项 1229.1.2 添加...

    电子书:MongoDB权威指南(中文版)

    1158.3.3 其他安全考虑 1168.4 备份和修复 1168.4.1 数据文件备份 1178.4.2 mongodump 和mongorestore 1178.4.3 fsync 和锁 1188.4.4 从属备份 1198.4.5 修复 119第9 章 复制 1219.1 主从复制 ...

    EasyMR并行架构技术源码 v1.0

    现有的并行框架基本是基于文件系统,对于一个开发团队的首要任务是处理海量的数据库数据,EasyMR不需要对于存储的复杂理解,程序员既可以使用数据库的同步复制来分担数据库读写压力,也可以一次读完,计算机集群计算...

    word源码java-FBDP_Lab_4:FBDP_Lab_4

    实现了对常见标点的切分,以此忽略各种常见标点符号。 同时利用: word.set(itr.nextToken().toLowerCase()); 将所有字母置为小写,实现了忽略大小写的功能。 矩阵乘法参考了书本的源代码实现(matrix.java) 关系...

Global site tag (gtag.js) - Google Analytics