对于Hadoop来说,是通过在DataNode中启动Map/Reduce java进程的方式来实现分布式计算处理的,那么就从源码层简要分析一下hadoop中启动Map/Reduce任务的过程。
首先,对于Map/Reduce端启动的任务,都是通过一些参数来控制java opts的,mapreduce.map.java.opts,mapreduce.reduce.java.opts,这些参数都在MRJobConfig类中,拿map.java.opts举例来说,org.apache.hadoop.mapred.MapReduceChildJVM类中使用了这个参数,用来构造Map或Reduce端的JVM,在下面方法中拿到了ChildJavaOpts:
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask)
如果是MapTask,JavaOpts的获得具体方法内容如下(Reduce端逻辑基本一致):
if (isMapTask) { userClasspath = jobConf.get( JobConf.MAPRED_MAP_TASK_JAVA_OPTS, jobConf.get( JobConf.MAPRED_TASK_JAVA_OPTS, JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS) ); adminClasspath = jobConf.get( MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); // Add admin classpath first so it can be overridden by user. return adminClasspath + " " + userClasspath;
userClassPath按照下面的参数顺序获得:
mapreduce.map.java.opts, mapred.child.java.opts, DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m”;
adminClassPath要获得的参数的基本顺序:
mapreduce.admin.map.child.java.opts, DEFAULT_MAPRED_ADMIN_JAVA_OPTS ="-Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN ";
最后,由于在JVM启动的参数中,后面能够覆盖掉前面的,因此userClassPath的同个选项的设置是可以覆盖adminClassPath,adminClassPath是不能保证所有参数不能被覆盖。
从源码端向上走,跳转到方法:
public static List<String> getVMCommand(InetSocketAddress taskAttemptListenerAddr, Task task, ID jvmID)
从这个方法中可以看到一个隐藏的设置,如果在选项中使用了@taskid@,是可以被替换成具体的attemptID的。
String javaOpts = getChildJavaOpts(conf, task.isMapTask()); javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
代码中也以GC为例子,在注释中举例说明用法:
// Add child (task) java-vm options. // // The following symbols if present in mapred.{map|reduce}.child.java.opts // value are replaced: // + @taskid@ is interpolated with value of TaskID. // Other occurrences of @ will not be altered. // // Example with multiple arguments and substitutions, showing // jvm GC logging, and start of a passwordless JVM JMX agent so can // connect with jconsole and the likes to watch child memory, threads // and get thread dumps. // // <property> // <name>mapred.map.child.java.opts</name> // <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \ // -Dcom.sun.management.jmxremote.authenticate=false \ // -Dcom.sun.management.jmxremote.ssl=false \ // </value> // </property> // // <property> // <name>mapred.reduce.child.java.opts</name> // <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \ // -Dcom.sun.management.jmxremote.authenticate=false \ // -Dcom.sun.management.jmxremote.ssl=false \ // </value> // </property> //
一般情况下,我们在打印gc日志时,需要用-Xloggc:指定具体的目录,但是在MapReduce任务中你无法指定,因为可能两个Map会在同一台机器上执行,那样就肯定会发生gc文件覆盖,而实用@taskid@就可以避免这个问题。同样适用于OOM导致的堆栈打印避免文件意外被覆盖。
还会默认增加一个参数,用来设置java的临时文件夹(所有临时文件的建立都在这个文件夹,比如File.createTempFile):
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
主类为:
org.apache.hadoop.mapred.YarnChild
一个YarnChild进程的最终完整命令为:
/usr/java/jdk1.7.0_11//bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx2048M -Djava.io.tmpdir=/home/data5/hdfsdir/nm-local-dir/usercache/xxx/appcache/application_1413206225298_36914/container_1413206225298_36914_01_000098/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/home/workspace/hadoop/logs/userlogs/application_1413206225298_36914/container_1413206225298_36914_01_000098 -Dyarn.app.container.log.filesize=209715200 -Dhadoop.root.logger=INFO,CLA org.apache.hadoop.mapred.YarnChild 192.168.7.26 21298 attempt_1413206225298_36914_r_000001_0 98
从源码端来分析该命令是如何生成的:
- $JAVA_HOME: /usr/java/jdk1.7.0_11/
- 源码中写死: /bin/java
- mapreduce.admin.map.child.java.opts:如果不设置,使用-Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN;
- mapreduce.map.java.opts: Xmx2048M;
- 源码中添加:-Djava.io.tmpdir=/home/data5/hdfsdir/nm-local-dir/usercache/tong/appcache/application_1413206225298_36914/container_1413206225298_36914_01_000098/tmp;
- org.apache.hadoop.mapred.MapReduceChildJVM.setLog4jProperties中设置log4j,包括日志级别,日志大小等:-Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/home/workspace/hadoop/logs/userlogs/application_1413206225298_36914/container_1413206225298_36914_01_000098 -Dyarn.app.container.log.filesize=209715200 -Dhadoop.root.logger=INFO,CLA;
- 主类:org.apache.hadoop.mapred.YarnChild;
- TaskAttempt的主机地址:192.168.7.xx;
- TaskAttempt的主机端口:212xx;
- TaskAttempt ID:attempt_1413206225298_36914_r_000001_0;
- JVMID:98(这是干啥的不太清楚);
最后将进程的正常和异常输出重定向:
// Finally add the jvmID vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT)); vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));
相关推荐
在对Map/Reduce算法进行分析的基础上,利用开源Hadoop软件设计出高容错高性能的分布式搜索引擎,以面对搜索引擎对海量数据的处理和存储问题
出了一种基于map/reduce的应用于网络安全事件分析的并行关联方法。一方面,通过对BIRCH 算法的改进,在BIRCH的分层次思想中引入预定义的规则库进行聚类,可以实现多级关联以 及提高聚类的收敛效果,从而达到精简...
本项目为一个Hadoop课程设计,使用Java语言和map/reduce实现贝叶斯文本分类器。项目的具体内容如下:1:用MapReduce算法实现贝叶斯分类器的训练过程,并输出训练模型; 2:用输出的模型对测试集文档进行分类测试。...
【MapReduce篇06】MapReduce之MapJoin和ReduceJoin1
Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的,Google已经将它完整的MapReduce论文公开发布了。其中对它的定义是,Map/Reduce是一个编程模型(programming model),是...
利用hadoop的mapreduce把oracle/mysql中的数据导入到hbase和hdfs中的两个java程序
Map-Reduce原理体系架构和工作机制,eclipse与Hadoop集群连接
这是一篇关于分布式计算的毕业设计论文,内容摘要:分布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架。在Hadoop中,分布式文件系统,很大程度上,是为各种...
文档包含求平均成绩的MapReduce代码 3、平均成绩 3.1实例描述 3.2设计思路 3.3程序代码 4、单表关联 4.1实例描述 4.2设计思路 4.3程序代码
如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 我们先看map。map()函数接收两个参数,一个是函数,一个是序列,map将传入...
Hadoop, Apache开源的分布式框架。源自Google GFS,BigTable,MapReduce 论文。 == HDFS == HDFS (Hadoop Distributed File System),Hadoop 分布式文件系统。...TaskTracker,启动和管理Map和Reduce子任务的节点。
[2]MapReduce: Simplified Data Processing on Large Clusters [3]The Google File System [4]Large-scale Incremental Processing Using Distributed Transactions and Notifications [5]Dremel: Interactive ...
Type Map=0,Reduce=0 //记录map任务和reduce任务的完成个数。 MapWorkerSTATE[] ReduceWorkerSTATE[] //各个工作机器的忙闲状态 FileSplit(string inputfilename) //输入文件切割 JobAssign() //工作任务分配 2....
Ma-pReduce模型受函数式编程语言的启发,将大规模数据处理作业拆分成若干个可独立运行的Map任务,分配到不同的机器上去执行,生成某种格式的中间文件,再由若干个Reduce任务合并这些中间文件获得最后的输出文件。...
下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。 写自己的逻辑,对...
放到eclipse的plugins目录下。重启eclipse ...配置 Map/Reduce Master和DFS Mastrer,Host和Port配置成hdfs-site.xml与core-site.xml的设置一致即可。 如果连接成功,会出现hdfs上面的文件夹
基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf...
Map:俗点说就是直接把数据打散,一份数据把它切分成多份小的数据进行处理,这个过程可以称之为Map。 Reduce:有打散当然要有聚合,把处理完的数据再重新合成一个,这个过程称之为Reduce。 这两个操作实际上就是...
map reduce的全部执行流程,源码分析视图
Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的,Google已经将它完整的MapReduce论文公开发布了。其中对它的定义是,Map/Reduce是一个编程模型(programmingmodel),是...