有一个格式化的数据文件,用\t分割列,第2列为产品名称。现在需求把数据文件根据产品名切分为多个文件,使用MapReduce程序要如何实现?
原始文件:
[root@localhost opt]# cat aprData
1 a1 a111
2 a2 a211
3 a1 a112
4 a1 a112
5 a1 a112
6 a1 a112
7 a2 a112
8 a2 a112
9 a2 a112
10 a3 a113
思路:
1.用一个mapreduce程序找出所有产品名称:
1.1map<k2,v2>为<产品名称,null>
1.2reduce<k3,v3>为<产品名称,null>
实现:AprProduces类
[root@localhost opt]# hadoop jar apr-produces.jar /aprData /aprProduce-output
Warning: $HADOOP_HOME is deprecated.
16/05/01 15:00:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
16/05/01 15:00:12 INFO input.FileInputFormat: Total input paths to process : 1
16/05/01 15:00:12 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/05/01 15:00:12 WARN snappy.LoadSnappy: Snappy native library not loaded
16/05/01 15:00:13 INFO mapred.JobClient: Running job: job_201605010048_0020
16/05/01 15:00:14 INFO mapred.JobClient: map 0% reduce 0%
16/05/01 15:00:33 INFO mapred.JobClient: map 100% reduce 0%
16/05/01 15:00:45 INFO mapred.JobClient: map 100% reduce 100%
16/05/01 15:00:50 INFO mapred.JobClient: Job complete: job_201605010048_0020
16/05/01 15:00:50 INFO mapred.JobClient: Counters: 29
16/05/01 15:00:50 INFO mapred.JobClient: Map-Reduce Framework
16/05/01 15:00:50 INFO mapred.JobClient: Spilled Records=20
16/05/01 15:00:50 INFO mapred.JobClient: Map output materialized bytes=56
16/05/01 15:00:50 INFO mapred.JobClient: Reduce input records=10
16/05/01 15:00:50 INFO mapred.JobClient: Virtual memory (bytes) snapshot=3868389376
16/05/01 15:00:50 INFO mapred.JobClient: Map input records=10
16/05/01 15:00:50 INFO mapred.JobClient: SPLIT_RAW_BYTES=89
16/05/01 15:00:50 INFO mapred.JobClient: Map output bytes=30
16/05/01 15:00:50 INFO mapred.JobClient: Reduce shuffle bytes=56
16/05/01 15:00:50 INFO mapred.JobClient: Physical memory (bytes) snapshot=240697344
16/05/01 15:00:50 INFO mapred.JobClient: Reduce input groups=3
16/05/01 15:00:50 INFO mapred.JobClient: Combine output records=0
16/05/01 15:00:50 INFO mapred.JobClient: Reduce output records=3
16/05/01 15:00:50 INFO mapred.JobClient: Map output records=10
16/05/01 15:00:50 INFO mapred.JobClient: Combine input records=0
16/05/01 15:00:50 INFO mapred.JobClient: CPU time spent (ms)=1490
16/05/01 15:00:50 INFO mapred.JobClient: Total committed heap usage (bytes)=177016832
16/05/01 15:00:50 INFO mapred.JobClient: File Input Format Counters
16/05/01 15:00:50 INFO mapred.JobClient: Bytes Read=101
16/05/01 15:00:50 INFO mapred.JobClient: FileSystemCounters
16/05/01 15:00:50 INFO mapred.JobClient: HDFS_BYTES_READ=190
16/05/01 15:00:50 INFO mapred.JobClient: FILE_BYTES_WRITTEN=43049
16/05/01 15:00:50 INFO mapred.JobClient: FILE_BYTES_READ=56
16/05/01 15:00:50 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=9
16/05/01 15:00:50 INFO mapred.JobClient: Job Counters
16/05/01 15:00:50 INFO mapred.JobClient: Launched map tasks=1
16/05/01 15:00:50 INFO mapred.JobClient: Launched reduce tasks=1
16/05/01 15:00:50 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=11002
16/05/01 15:00:50 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
16/05/01 15:00:50 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=13561
16/05/01 15:00:50 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
16/05/01 15:00:50 INFO mapred.JobClient: Data-local map tasks=1
16/05/01 15:00:50 INFO mapred.JobClient: File Output Format Counters
16/05/01 15:00:50 INFO mapred.JobClient: Bytes Written=9
[root@localhost opt]# hadoop fs -cat /aprProduce-output/part-r-00000
Warning: $HADOOP_HOME is deprecated.
a1
a2
a3
2.再用一个mapreduce程序对文件进行切分:
2.1map<k2,v2>为<产品名称,line>
2.2reduce<k3,v3>为<line,null>
2.3自定义分区partition,读取第一个mapreduce程序的输出文件,组装成一个map<产品名称,index>,在partition中判断产品名称并返回下标,没有找到放在0下标中。
2.4设置taskNum(reduce的个数),taskNum应该和partition的个数一致.
3.5使用MultipleOutPuts类进行重命名输出文件,输出文件为 xxx-00001 等
实现:AprClassify类
[root@localhost opt]# hadoop jar apr-classify.jar /aprData /apr-output
Warning: $HADOOP_HOME is deprecated.
16/05/01 14:09:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
16/05/01 14:09:11 INFO input.FileInputFormat: Total input paths to process : 1
16/05/01 14:09:11 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/05/01 14:09:11 WARN snappy.LoadSnappy: Snappy native library not loaded
16/05/01 14:09:11 INFO mapred.JobClient: Running job: job_201605010048_0017
16/05/01 14:09:13 INFO mapred.JobClient: map 0% reduce 0%
16/05/01 14:09:29 INFO mapred.JobClient: map 100% reduce 0%
16/05/01 14:09:41 INFO mapred.JobClient: map 100% reduce 33%
16/05/01 14:09:44 INFO mapred.JobClient: map 100% reduce 66%
16/05/01 14:09:56 INFO mapred.JobClient: map 100% reduce 100%
16/05/01 14:10:01 INFO mapred.JobClient: Job complete: job_201605010048_0017
16/05/01 14:10:01 INFO mapred.JobClient: Counters: 29
16/05/01 14:10:01 INFO mapred.JobClient: Map-Reduce Framework
16/05/01 14:10:01 INFO mapred.JobClient: Spilled Records=20
16/05/01 14:10:01 INFO mapred.JobClient: Map output materialized bytes=169
16/05/01 14:10:01 INFO mapred.JobClient: Reduce input records=10
16/05/01 14:10:01 INFO mapred.JobClient: Virtual memory (bytes) snapshot=7754653696
16/05/01 14:10:01 INFO mapred.JobClient: Map input records=10
16/05/01 14:10:01 INFO mapred.JobClient: SPLIT_RAW_BYTES=89
16/05/01 14:10:01 INFO mapred.JobClient: Map output bytes=131
16/05/01 14:10:01 INFO mapred.JobClient: Reduce shuffle bytes=169
16/05/01 14:10:01 INFO mapred.JobClient: Physical memory (bytes) snapshot=387825664
16/05/01 14:10:01 INFO mapred.JobClient: Reduce input groups=3
16/05/01 14:10:01 INFO mapred.JobClient: Combine output records=0
16/05/01 14:10:01 INFO mapred.JobClient: Reduce output records=0
16/05/01 14:10:01 INFO mapred.JobClient: Map output records=10
16/05/01 14:10:01 INFO mapred.JobClient: Combine input records=0
16/05/01 14:10:01 INFO mapred.JobClient: CPU time spent (ms)=3950
16/05/01 14:10:01 INFO mapred.JobClient: Total committed heap usage (bytes)=209522688
16/05/01 14:10:01 INFO mapred.JobClient: File Input Format Counters
16/05/01 14:10:01 INFO mapred.JobClient: Bytes Read=101
16/05/01 14:10:01 INFO mapred.JobClient: FileSystemCounters
16/05/01 14:10:01 INFO mapred.JobClient: HDFS_BYTES_READ=199
16/05/01 14:10:01 INFO mapred.JobClient: FILE_BYTES_WRITTEN=86609
16/05/01 14:10:01 INFO mapred.JobClient: FILE_BYTES_READ=169
16/05/01 14:10:01 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=104
16/05/01 14:10:01 INFO mapred.JobClient: Job Counters
16/05/01 14:10:01 INFO mapred.JobClient: Launched map tasks=1
16/05/01 14:10:01 INFO mapred.JobClient: Launched reduce tasks=3
16/05/01 14:10:01 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=35295
16/05/01 14:10:01 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
16/05/01 14:10:01 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=13681
16/05/01 14:10:01 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
16/05/01 14:10:01 INFO mapred.JobClient: Data-local map tasks=1
16/05/01 14:10:01 INFO mapred.JobClient: File Output Format Counters
16/05/01 14:10:01 INFO mapred.JobClient: Bytes Written=0
[root@localhost opt]# hadoop fs -ls /apr-output/
Warning: $HADOOP_HOME is deprecated.
Found 8 items
-rw-r--r-- 1 root supergroup 0 2016-05-01 14:09 /apr-output/_SUCCESS
drwxr-xr-x - root supergroup 0 2016-05-01 14:09 /apr-output/_logs
-rw-r--r-- 1 root supergroup 51 2016-05-01 14:09 /apr-output/a1-r-00000
-rw-r--r-- 1 root supergroup 41 2016-05-01 14:09 /apr-output/a2-r-00001
-rw-r--r-- 1 root supergroup 12 2016-05-01 14:09 /apr-output/a3-r-00002
-rw-r--r-- 1 root supergroup 0 2016-05-01 14:09 /apr-output/part-r-00000
-rw-r--r-- 1 root supergroup 0 2016-05-01 14:09 /apr-output/part-r-00001
-rw-r--r-- 1 root supergroup 0 2016-05-01 14:09 /apr-output/part-r-00002
[root@localhost opt]# hadoop fs -cat /apr-output/a1-r-00000
Warning: $HADOOP_HOME is deprecated.
1 a1 a111
3 a1 a112
4 a1 a112
5 a1 a112
6 a1 a112
[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00000
Warning: $HADOOP_HOME is deprecated.
cat: File does not exist: /apr-output/a2-r-00000
[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00001
Warning: $HADOOP_HOME is deprecated.
2 a2 a211
7 a2 a112
8 a2 a112
9 a2 a112
[root@localhost opt]# hadoop fs -cat /apr-output/a3-r-00002
Warning: $HADOOP_HOME is deprecated.
10 a3 a113
3.用hdfs对文件进行批量复制,重命名并转移产品数据文件到指定目录
实现:RenameApr类
[root@localhost opt]# hadoop fs -ls /aprProduces
Warning: $HADOOP_HOME is deprecated.
Found 3 items
-rw-r--r-- 3 yehao supergroup 51 2016-05-01 14:37 /aprProduces/a1
-rw-r--r-- 3 yehao supergroup 41 2016-05-01 14:37 /aprProduces/a2
-rw-r--r-- 3 yehao supergroup 12 2016-05-01 14:37 /aprProduces/a3
[root@localhost opt]# hadoop fs -cat /aprProduces/a1
Warning: $HADOOP_HOME is deprecated.
1 a1 a111
3 a1 a112
4 a1 a112
5 a1 a112
6 a1 a112
[root@localhost opt]# hadoop fs -cat /aprProduces/a2
Warning: $HADOOP_HOME is deprecated.
2 a2 a211
7 a2 a112
8 a2 a112
9 a2 a112
[root@localhost opt]# hadoop fs -cat /aprProduces/a3
Warning: $HADOOP_HOME is deprecated.
10 a3 a113
代码部分:
1.com.huawei.AprClassify
package com; import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class AprClassify { private static int taskNum = HdfsUtils.getMapSize(); public static void main(String[] args) throws Exception { Job job = new Job(new Configuration(), AprClassify.class.getSimpleName()); job.setJarByClass(AprClassify.class); job.setMapperClass(AprClassifyMap.class); job.setReducerClass(AprClassifyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(AprClassifyPartitioner.class); job.setNumReduceTasks(taskNum+1); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } } class AprClassifyReducer extends Reducer<Text, Text, Text, NullWritable>{ private MultipleOutputs<Text, NullWritable> outputs; protected void setup(Context context) throws IOException, InterruptedException { outputs = new MultipleOutputs<Text, NullWritable>(context); } @Override protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String st = ""; for (Text text : v2s) { st += text.toString() +"\n"; } Text k3 = new Text(st); outputs.write(k3, NullWritable.get(), k2.toString()); } protected void cleanup(Context context) throws IOException, InterruptedException { outputs.close(); } } class AprClassifyMap extends Mapper<LongWritable, Text, Text, Text>{ Text k2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] splited = line.split("\t"); k2.set(splited[1]); System.out.println(context); System.out.println(k2); System.out.println(value); context.write(k2, value); } } class AprClassifyPartitioner extends Partitioner<Text, Text> { private static Map<String, Integer> map = HdfsUtils.getMap(); @Override public int getPartition(Text key, Text value, int numPartitions) { if(map.get(key.toString()) == null){ return 0; } return map.get(key.toString()); } }
2.com.huawei.HdfsUtils
package com.huawei; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HdfsUtils { private static FileSystem fileSystem; private static Map<String, Integer> map; private static FileSystem getFileSystem() throws URISyntaxException, IOException { if(fileSystem == null){ Configuration conf = new Configuration(); URI uri = new URI("hdfs://192.168.1.190:9000"); fileSystem = FileSystem.get(uri, conf); } return fileSystem; } public static int getMapSize(){ return getMap().size(); } public static Map<String, Integer> getMap(){ if(map == null){ map = new HashMap<String, Integer>(); FSDataInputStream in; BufferedReader reader = null; try{ fileSystem = getFileSystem(); in = fileSystem.open(new Path("hdfs://192.168.1.190:9000/aprProduce")); reader = new BufferedReader(new InputStreamReader(in)); String line = null; int i = 1; while((line = reader.readLine()) != null) { map.put(line, i++); } }catch(Exception e){ e.printStackTrace(); }finally{ try { if(reader != null) reader.close(); } catch (IOException e) { e.printStackTrace(); } } } return map; } public static void copyProduces(String inputPath, String outPutDir) throws Exception{ FileStatus[] listStatus = getFileSystem().listStatus(new Path(inputPath)); for (FileStatus fileStatus : listStatus) { String name = fileStatus.getPath().getName(); if(!fileStatus.isDir() && !StringUtils.equals(name, "_SUCCESS") && !StringUtils.startsWith(name, "part-r-")){ FSDataInputStream openStream = fileSystem.open(fileStatus.getPath()); IOUtils.copyBytes(openStream, fileSystem.create(new Path("/"+outPutDir+"/"+name.split("-")[0])), 1024, false); IOUtils.closeStream(openStream); } } } }
3.com.huawei.AprProduces
package com.huawei; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 分析文件,获得所有产品名 * args[0] 原始文件 * args[1] 输出文件:所有产品名 * */ public class AprProduces { public static void main(String[] args) throws Exception { Job job = new Job(new Configuration(), AprProduces.class.getSimpleName()); job.setJarByClass(AprProduces.class); job.setMapperClass(AprProducesMap.class); job.setReducerClass(AprProducesReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } } class AprProducesMap extends Mapper<LongWritable, Text, Text, NullWritable>{ Text k2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] splited = line.split("\t"); k2.set(splited[1]);//四个文件的 文件名的下标不一样,需要修改 context.write(k2, NullWritable.get()); } } class AprProducesReducer extends Reducer<Text, Text, Text, NullWritable>{ @Override protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(k2, NullWritable.get()); } }
4.com.huawei.RenameApr
package com.huawei; public class RenameApr { public static void main(String[] args) throws Exception{ //文件重命名 HdfsUtils.copyProduces("/apr-output/", "aprProduce"); } }
相关推荐
在MapReduce刚开始的时候,会先对文件进行切片(Split)处理。需要注意的是,切片本身是一种逻辑切分而不是物理切分,本质上就是在划分任务量,之后每一个切片会交给一个单独的MapTask来进行处理。默认情况下,Split和...
算模型和框架,负责计算 HDFS HDFS: Hadoop Distributed File System 构建于本地文件系统之上,例如:ext3, xfs等 特点:多备份、一次写入(不允许修改) MapReduce 基本思想: 分而治之: 数据被切分成许多独立分片...
2.7 使用 MapReduce 来实现 TopK 问题,可以使用自定义的Mapper 和 Reducer 来实现数据的处理和排序。 大数据工程师面试题涵盖了 Hadoop 相关的知识点,包括 HDFS、MapReduce、YARN 等,旨在考察应聘者的 Hadoop ...
对数据进行排序 组合和分割数据 Pig实战 并行处理 参数代换 第12章 Hive 1.1 安装Hive 1.1.1 Hive外壳环境 1.2 示例 1.3 运行Hive 1.3.1 配置Hive 1.3.2 Hive服务 1.3.3 Metastore 1.4 和传统数据库进行比较 1.4.1 ...
MapReduce是一个高性能的批处理分布式计算框架,用于对海量数据进行并行分析和处理。MapReduce将分析任务分为大量的并行Map任务和Reduce任务两类。 10. MapReduce的结构化、半结构化和非结构化数据 MapReduce适合...
对数据进行排序 组合和分割数据 Pig实战 并行处理 参数代换 第12章 Hive 1.1 安装Hive 1.1.1 Hive外壳环境 1.2 示例 1.3 运行Hive 1.3.1 配置Hive 1.3.2 Hive服务 1.3.3 Metastore 1.4 和...
Map阶段负责并行加载数据源,并对数据按照特定逻辑进行切分。Reduce阶段负责对Map阶段结果进行汇总。 Yarn是一个资源调度管理框架,负责将系统资源分配给在Hadoop集群中运行的各种应用程序,并调度在不同集群节点上...
MapReduce是一个高性能的批处理分布式计算框架,用于对海量数据进行并行分析和处理。MapReduce将分析任务分为大量的并行Map任务和Reduce任务两类。 MapReduce适合处理的任务: * 适用于离线批处理任务 * 是以“行...
针对上述问题,提出了一种基于Hadoop的语音识别系统,借助其分布式文件系统HDFS与MapReduce并行算法解决文件片段传输与并行调度控制的问题,同时引入静音检测算法合理地处理文件切分,通过实验验证了该系统的有效性...
在Mapper接口中,我们使用StringTokenizer来切分输入数据,并将其输出给Reducer接口。在Reducer接口中,我们使用HashMap来聚合数据,并将其输出给下一个阶段。 在编写好MapReduce程序后,需要使用命令mvn clean ...
文件以块为单位进行切分存储,块通常设置的比较大(最小6M,默认128M),根据网络带宽计算最佳值。 块越大,寻址越快,读取效率越高,但同时由于MapReduce任务也是以块为最小单位来处理,所以太大的块不利于于对...
如果对文件使用 GZIP 压缩等不支持文件分割操作的压缩方式,MR 任务读取压缩后的文件时,是对它切分不了的,该压缩文件只会被一个任务所读取,如果有一个超大的不可切分的压缩文件被一个 map 读取时,就会发生 map ...
4. Sqoop是一种数据ETL工具,用于在关系型数据库、数据仓库等多种数据源与Hadoop存储系统之间进行高效批量数据传输。 5. Spark是一种基于内存的分布式计算框架,用于构建大型的、低延迟的数据分析应用程序。 6. 在...
1168.4 备份和修复 1168.4.1 数据文件备份 1178.4.2 mongodump和mongorestore 1178.4.3 fsync和锁 1188.4.4 从属备份 1198.4.5 修复 119第9章 复制 1219.1 主从复制 1219.1.1 选项 1229.1.2 添加...
1158.3.3 其他安全考虑 1168.4 备份和修复 1168.4.1 数据文件备份 1178.4.2 mongodump 和mongorestore 1178.4.3 fsync 和锁 1188.4.4 从属备份 1198.4.5 修复 119第9 章 复制 1219.1 主从复制 ...
现有的并行框架基本是基于文件系统,对于一个开发团队的首要任务是处理海量的数据库数据,EasyMR不需要对于存储的复杂理解,程序员既可以使用数据库的同步复制来分担数据库读写压力,也可以一次读完,计算机集群计算...
实现了对常见标点的切分,以此忽略各种常见标点符号。 同时利用: word.set(itr.nextToken().toLowerCase()); 将所有字母置为小写,实现了忽略大小写的功能。 矩阵乘法参考了书本的源代码实现(matrix.java) 关系...