- 浏览: 300772 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (165)
- hadoop (47)
- linux (11)
- nutch (7)
- hbase (7)
- solr (4)
- zookeeper (4)
- J2EE (1)
- jquery (3)
- java (17)
- mysql (14)
- perl (2)
- compass (4)
- suse (2)
- memcache (1)
- as (1)
- roller (1)
- web (7)
- MongoDB (8)
- struts2 (3)
- lucene (2)
- 算法 (4)
- 中文分词 (3)
- hive (17)
- noIT (1)
- 中间件 (2)
- maven (2)
- sd (0)
- php (2)
- asdf (0)
- kerberos 安装 (1)
- git (1)
- osgi (1)
- impala (1)
- book (1)
- python 安装 科学计算包 (1)
最新评论
-
dandongsoft:
你写的不好用啊
solr 同义词搜索 -
黎明lm:
meifangzi 写道楼主真厉害 都分析源码了 用了很久. ...
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker -
meifangzi:
楼主真厉害 都分析源码了
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker -
zhdkn:
顶一个,最近也在学习设计模式,发现一个问题,如果老是看别人的博 ...
Java观察者模式(Observer)详解及应用 -
lvwenwen:
木南飘香 写道
高并发网站的架构
hadoop mapreduce 之所有能够实现job的运行,以及将job分配到不同datanode 上的map和reduce task 是由TaskSchduler 完成的.
TaskScheduler mapreduce的任务调度器类,当jobClient 提交一个job 给JobTracker 的时候.JobTracker 接受taskTracker 的心跳.心跳信息含有空闲的slot信息等.JobTracker 则通过调用TaskScheduler 的assignTasks()方法类给报告心跳信息中含有空闲的slots信息的taskTracker 分布任务、
TaskScheduler 类为hadoop的 调度器的抽象类。默认继承它作为hadoop调度器的方式为FIFO,当然也有Capacity 和Fair等其他调度器,也可以自己编写符合特定场景所需要的调度器.通过继承TaskScheduler 类即可完成该功能、
下面就 FIFO 调度器进行简单的说明:
JobQueueTaskScheduler 类为FIFO 调度器的实现类.
1. 首先JobQueueTaskSchduler 注册两个监听器类:
JobQueueJobInProgressListener jobQueueJobInProgressListener;
EagerTaskInitializationListener eagerTaskInitializationListener;
JobQueueJobInProgressListener 维护一个job的queue ,其中JobSchedulingInfo 中包含job调度的信息:priority,startTime,id.以及 jobAdd update 等操作jobqueue的方法
EagerTaskInitializationListener 初始化job的listener ,这里所谓的初始化不是初始化job的属性信息,而是针对已经存在jobqueue中 即将被执行job的初始化,
resortInitQueue 按照priority 和starttime 来排序
jobRemoved()
jobUpdated()
jobStateChanged()当priority或是starttime被改变的时候则重新调用resortInitQueue()重新排序
在JobTracker 启动的时候 创建 mapred.jobinit.threads 改数量的线程去监控jobqueue.当jobqueue 中含有job的时候 则initjob
调度其中核心逻辑在assignTasks()方法中
下面分析分析 FIFO模式下的 assignTasks()
上面方法中真正执行task的方法为:
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
下一张详细的分析这两个方法
TaskScheduler mapreduce的任务调度器类,当jobClient 提交一个job 给JobTracker 的时候.JobTracker 接受taskTracker 的心跳.心跳信息含有空闲的slot信息等.JobTracker 则通过调用TaskScheduler 的assignTasks()方法类给报告心跳信息中含有空闲的slots信息的taskTracker 分布任务、
TaskScheduler 类为hadoop的 调度器的抽象类。默认继承它作为hadoop调度器的方式为FIFO,当然也有Capacity 和Fair等其他调度器,也可以自己编写符合特定场景所需要的调度器.通过继承TaskScheduler 类即可完成该功能、
下面就 FIFO 调度器进行简单的说明:
JobQueueTaskScheduler 类为FIFO 调度器的实现类.
1. 首先JobQueueTaskSchduler 注册两个监听器类:
JobQueueJobInProgressListener jobQueueJobInProgressListener;
EagerTaskInitializationListener eagerTaskInitializationListener;
JobQueueJobInProgressListener 维护一个job的queue ,其中JobSchedulingInfo 中包含job调度的信息:priority,startTime,id.以及 jobAdd update 等操作jobqueue的方法
EagerTaskInitializationListener 初始化job的listener ,这里所谓的初始化不是初始化job的属性信息,而是针对已经存在jobqueue中 即将被执行job的初始化,
class JobInitManager implements Runnable { public void run() { JobInProgress job = null; while (true) { try { synchronized (jobInitQueue) { while (jobInitQueue.isEmpty()) { jobInitQueue.wait(); } job = jobInitQueue.remove(0); } threadPool.execute(new InitJob(job)); } catch (InterruptedException t) { LOG.info("JobInitManagerThread interrupted."); break; } } LOG.info("Shutting down thread pool"); threadPool.shutdownNow(); } }
resortInitQueue 按照priority 和starttime 来排序
jobRemoved()
jobUpdated()
jobStateChanged()当priority或是starttime被改变的时候则重新调用resortInitQueue()重新排序
public EagerTaskInitializationListener(Configuration conf) { numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS); threadPool = Executors.newFixedThreadPool(numThreads); }
在JobTracker 启动的时候 创建 mapred.jobinit.threads 改数量的线程去监控jobqueue.当jobqueue 中含有job的时候 则initjob
class InitJob implements Runnable { private JobInProgress job; public InitJob(JobInProgress job) { this.job = job; } //调用run方法 回调TaskTrackerManager public void run() { ttm.initJob(job); } }
调度其中核心逻辑在assignTasks()方法中
下面分析分析 FIFO模式下的 assignTasks()
@Override public synchronized List<Task> assignTasks(TaskTracker taskTracker) throws IOException { TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); //获取集群中TaskTracker 总数 final int numTaskTrackers = clusterStatus.getTaskTrackers(); //集群中map slot总数 final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); //集群中reduce slot 总数 final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); // // Get map + reduce counts for the current tracker. // //当前的taskTracker 上map slot 总数 final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots(); //当前的taskTracker 上reduce slot 总数 final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots(); //当前的taskTracker上正在运行的 map数目 final int trackerRunningMaps = taskTrackerStatus.countMapTasks(); //当前的taskTracker上正在运行的 reduce数目 final int trackerRunningReduces = taskTrackerStatus.countReduceTasks(); // Assigned tasks List<Task> assignedTasks = new ArrayList<Task>(); // // Compute (running + pending) map and reduce task numbers across pool // //该taskTracker上剩余的reduce数 int remainingReduceLoad = 0; //该taskTracker 剩余的map数 int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); if (job.scheduleReduces()) { remainingReduceLoad += (job.desiredReduces() - job.finishedReduces()); } } } } // Compute the 'load factor' for maps and reduces //map因子 double mapLoadFactor = 0.0; if (clusterMapCapacity > 0) { mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; } double reduceLoadFactor = 0.0; if (clusterReduceCapacity > 0) { reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; } // // In the below steps, we allocate first map tasks (if appropriate), // and then reduce tasks if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We assign tasks to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // However, if the cluster is close to getting loaded i.e. we don't // have enough _padding_ for speculative executions etc., we only // schedule the "highest priority" task i.e. the task from the job // with the highest priority. // final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); } int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a node-local or rack-local Map task t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size(); // // Same thing, but for reduce tasks // However we _never_ assign more than 1 reduce task per heartbeat // final int trackerCurrentReduceCapacity = Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), trackerReduceCapacity); final int availableReduceSlots = Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); boolean exceededReducePadding = false; if (availableReduceSlots > 0) { exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity); synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { assignedTasks.add(t); break; } // Don't assign reduce tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededReducePadding) { break; } } } } if (LOG.isDebugEnabled()) { LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " + "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + trackerCurrentReduceCapacity + "," + trackerRunningReduces + "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + ", " + (assignedTasks.size()-assignedMaps) + "]"); } return assignedTasks; }
上面方法中真正执行task的方法为:
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
下一张详细的分析这两个方法
发表评论
-
博客地址变更
2013-08-16 10:29 1168all the guys of visiting the bl ... -
hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程
2013-04-09 17:36 2718taskTracker 生成map reduce ... -
hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程
2013-04-09 17:33 0taskTracker 生成map reduce ... -
hadoop 源码分析(四)JobTracker 添加job 到schduler 队列中
2013-03-29 18:37 2855启动 JobTracker 1. 进入main方法: ... -
hadoop 源码分析(三) hadoop RPC 机制
2013-03-28 15:13 2386Hadoop 通信机制采用自己编写的RPC. 相比于 ... -
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker
2013-03-27 12:57 37021.JobClient 客户端类 通过 ... -
hadoop 源码分析(一) jobClient 提交到JobTracker
2013-03-26 13:41 3585Hadoop 用了2年多了.从最初一起创业的 ... -
RHadoop 安装教程
2013-02-01 17:18 1592RHadoop 环境安装 硬件: centos6 ... -
pig
2012-11-16 19:28 1183转自:http://www.hadoopor.c ... -
hadoop与hive的映射
2012-11-15 10:21 2346hadoop与hive的映射 ... -
hadoop distcp
2012-07-31 10:00 2793hadoop distcp 使用:distcp ... -
MapReduce中Mapper类和Reducer类4函数解析
2012-07-20 18:05 2100MapReduce中Mapper类和Reducer类4函数解析 ... -
hadoop metrics 各参数解释
2012-07-17 18:59 1487hadoop metrics 各参数解释 研究使用hadoo ... -
Hbase几种数据入库(load)方式比较
2012-07-17 14:52 13541. 预先生成HFile入库 这个地址有详细的说明http:/ ... -
Hadoop客户端环境配置
2012-05-11 14:59 1747Hadoop客户端环境配置 1. 安装客户端(通过端用户可以 ... -
hadoop 通过distcp进行并行复制
2012-05-02 15:25 2413通过distcp进行并行复制 前面的HDFS访问模型都集中于 ... -
linux crontab 执行hadoop脚本 关于hadoop环境变量引入
2012-04-10 12:11 0crontab问题 crontab的特点:PATH不全和无终 ... -
hadoop fs 命令封装
2012-04-09 09:39 0hadoop fs 命令封装 #!/usr/bin/env ... -
map-reduce编程核心问题
2012-02-22 13:38 12461-How do we break up a large p ... -
Hadoop Archives
2012-02-17 14:25 0Hadoop Archives 什么是Hadoop arch ...
相关推荐
Hadoop源码分析(完整版),详细分析了Hadoop源码程序,为学习Hadoop的人提供很好的入门指导
Hadoop源码分析视频下载
caibinbupt的Hadoop源码分析完整版,包括 HDFS 和 MapReduce。 HDFS: 41章 MapReduce: 14章
Hadoop源码分析完整版..
Hadoop任务调度器 基础知识 • Hadoop调度流程 • Hadoop自带调度器介绍 • 编写自己的Hadoop调度器 • 总结
hadoop 源码分析 文档
Hadoop源码分析,Map-Reduce作业提交运行入手分析这个过程中涉及到的Hadoop源码架构,此外本文基于的Hadoop版本是2.6.4
Hadoop源代码分析(完整版).pdf
学习Hadoop源码过程中做的源码分析,共享一下,PPT中有我的邮箱,可以互相探讨。Hadoop源码分析(client端提交job到rm端)
大数据处理系统 hadoop源码分析 基于hadoop2.6
hadoop源码分析hadoop源码分析
自己写的PPT,详解Hadoop源码及其相关流程
Hadoop源码分析.rar 有助于hadoop学习者进一步学习!! 非常好的资源!!
一个比较详细的hadoop源码分析文档,内容很详实,包括hadoop、hdfs、hive等等等
Hadoop的源代码分析总共55章,包括HDFS: 41章、MapReduce: 14章。
hadoop源码,官方,放心下载,完整版,阅读hadoop源代码的目的不一定非是工作的需要,你可以把他看成一种修养,通过阅读hadoop源代码,加深自己对分布式系统的理解,培养自己踏实做事的心态。
NULL 博文链接:https://zqhxuyuan.iteye.com/blog/1879292
该压缩包包含Hadoop实战、Hadoop权威指南(第二版)、Hadoop源码分析(完整版)的内容,有助于初、中、高级工程师的理解和提升