- 浏览: 396938 次
- 性别:
- 来自: 南京
最新评论
-
x981114171:
不知大家有没发现,国产书,国人写的书就没几本是给力的,以后国人 ...
《Spring技术内幕》的读者问题交流 -
x981114171:
买了这本书,感觉很不值。来吐槽下,也许你自己是专家,不过写的书 ...
《Spring技术内幕》的读者问题交流 -
851228082:
作者,写的书很好,我觉得幸亏有源码,所以我才能看懂。一边看,一 ...
《Spring技术内幕》的读者问题交流 -
yueshang520:
说的真不错。。学习了
Spring技术内幕——深入解析Spring架构与设计原理(一)IOC实现原理 -
faith789510:
TransactionProxyFactoryBean 什么情 ...
Spring源代码解析(六):Spring声明式事务处理
上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。
这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:
这个taskScheduler采用的是默认的
这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:
task的取得就要到JobInProgress中去obtainNewReduceTask了,需要对集群的状态进行查询处理了。
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, askForNewTask, heartbeatResponseId);
这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { ............. //如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks if (acceptNewTasks) { TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName); if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); //这里是准备assignTask的地方,由配置的调度器来决定怎样调度 if (tasks == null ) { tasks = taskScheduler.assignTasks(taskTrackerStatus); } if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); actions.add(new LaunchTaskAction(task)); } } } }
这个taskScheduler采用的是默认的
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass,conf);
这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException { ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); int numTaskTrackers = clusterStatus.getTaskTrackers(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); // // Get map + reduce counts for the current tracker. // int maxCurrentMapTasks = taskTracker.getMaxMapTasks(); int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks(); int numMaps = taskTracker.countMapTasks(); int numReduces = taskTracker.countReduceTasks(); // // Compute average map and reduce task numbers across pool // int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { int totalMapTasks = job.desiredMaps(); int totalReduceTasks = job.desiredReduces(); remainingMapLoad += (totalMapTasks - job.finishedMaps()); remainingReduceLoad += (totalReduceTasks - job.finishedReduces()); } } } // find out the maximum number of maps or reduces that we are willing // to run on any node. int maxMapLoad = 0; int maxReduceLoad = 0; if (numTaskTrackers > 0) { maxMapLoad = Math.min(maxCurrentMapTasks, (int) Math.ceil((double) remainingMapLoad / numTaskTrackers)); maxReduceLoad = Math.min(maxCurrentReduceTasks, (int) Math.ceil((double) remainingReduceLoad / numTaskTrackers)); } int totalMaps = clusterStatus.getMapTasks(); int totalMapTaskCapacity = clusterStatus.getMaxMapTasks(); int totalReduces = clusterStatus.getReduceTasks(); int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks(); // // In the below steps, we allocate first a map task (if appropriate), // and then a reduce task if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We hand a task to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // if (numMaps < maxMapLoad) { int totalNeededMaps = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } //这里是取得Task的地方,需要到job中去取 Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } // // Beyond the highest-priority task, reserve a little // room for failures and speculative executions; don't // schedule tasks to the hilt. // totalNeededMaps += job.desiredMaps(); int padding = 0; if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) { padding = Math.min(maxCurrentMapTasks, (int)(totalNeededMaps * padFraction)); } if (totalMaps + padding >= totalMapTaskCapacity) { break; } } } } // // Same thing, but for reduce tasks // if (numReduces < maxReduceLoad) { int totalNeededReduces = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } // // Beyond the highest-priority task, reserve a little // room for failures and speculative executions; don't // schedule tasks to the hilt. // totalNeededReduces += job.desiredReduces(); int padding = 0; if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) { padding = Math.min(maxCurrentReduceTasks, (int) (totalNeededReduces * padFraction)); } if (totalReduces + padding >= totalReduceTaskCapacity) { break; } } } } return null; }
task的取得就要到JobInProgress中去obtainNewReduceTask了,需要对集群的状态进行查询处理了。
发表评论
-
开放注册了!我们的任务推客 - 群组任务和流程协同SaaS工具
2010-06-30 20:31 621经过这几天团队的努力,为软件增加了开放注册功能,大家可以自己动 ... -
我们团队研发的SaaS软件产品:应用于群组任务协同和流程管理
2010-06-21 15:39 846任务协同及流程管理SaaS软件:任务推客上线,欢迎大家免费体验 ... -
随笔:Spring与云计算(六)
2009-12-17 18:49 3650这样,就说到国内了, ... -
随笔:Spring与云计算(五)
2009-12-16 19:58 3251那其他呢,我们看看还 ... -
随笔:Spring与云计算(四)
2009-12-15 16:26 4013前面我们提到,Spring被VMWare收购而进入云计算领域, ... -
随笔:Spring与云计算(三)
2009-12-14 14:05 3601在前面的那张图中,可以看到SpringSource产品和云计算 ... -
随笔:Spring与云计算(二)
2009-12-11 14:57 4185这么大的范围的模式转 ... -
随笔:Spring与云计算(一)
2009-12-10 15:19 6790对Spring和云计算的关注 ... -
《Spring技术内幕 - 深入解析Spring架构与设计原理》上市了!
2009-12-09 15:17 8958详细的书本目录和章节节选请见附件!欢迎下载指正。 可以购买到 ... -
Spring技术内幕——深入解析Spring架构与设计原理(六)Spring ACEGI
2009-11-20 12:27 14391Spring ACEGI 作为Spring丰富生态系统中的一个 ... -
Spring技术内幕——深入解析Spring架构与设计原理(五)Spring与远端调用
2009-11-16 20:23 11023在应用开发中,常常涉及服务器系统中各种不同进程之间的通信与计算 ... -
Spring技术内幕——深入解析Spring架构与设计原理(四)Web MVC的实现
2009-11-08 08:55 21091以前的欠账,现在补上,欢迎指正和讨论。 Spring Web ... -
Spring技术内幕——深入解析Spring架构与设计原理(三)数据库的操作实现
2009-11-02 17:34 13647最近事情实在是比较多,没有及时更新帖子,还望大家见谅啊。今天, ... -
Spring技术内幕——深入解析Spring架构与设计原理(二)AOP
2009-10-20 08:30 22914关于AOP的个人理解 AOP联盟定义的AOP体系结构把与AO ... -
Spring技术内幕——深入解析Spring架构与设计原理(一)IOC实现原理
2009-10-19 10:47 98711内容较多,新开一贴, ... -
Spring技术内幕——深入解析Spring架构与设计原理(一)引子
2009-10-17 20:31 11022缘起 已经很久没有写 ... -
Hadoop的mapred TaskTracker端源码概览
2009-02-17 14:39 4049花了许多功夫把Hadoop的mapreduce实现过了一遍,基 ... -
Hadoop的mapred TaskTracker端源码概览
2009-02-17 14:37 1600这篇文章和博客的另一篇重复,删掉了。对不起,请参阅博客的另一篇 ... -
发布用javaeye生成的博客版本 - Spring源代码解析
2008-11-20 08:49 3564呵呵,试试javaeye的新功能。帮助大家阅读,文中的很多错误 ... -
Spring源代码解析(十):Spring Acegi框架授权的实现
2007-08-17 11:01 4832我们从FilterSecurityInterceptor我们从 ...
相关推荐
Hadoop源码 包含mapred
基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书...
hadoop mapred_tutorial官方文档
hadoop权威指南4和源码hadoop权威指南4和源码hadoop权威指南4和源码hadoop权威指南4和源码
Java操作Hadoop Mapreduce基本实践源码.
使用Eclipse的export功能把所有源码打包,然后把打包后的jar文件拷贝到hadoop集群的$HADOOP_HOME/share/hadoop/mapreduce/lib目录下面。这一步相当重要,否则项目将无法找到相关类。注意,如果搭建的是全分布集群,...
hadoop源码,官方,放心下载,完整版,阅读hadoop源代码的目的不一定非是工作的需要,你可以把他看成一种修养,通过阅读hadoop源代码,加深自己对分布式系统的理解,培养自己踏实做事的心态。
Hadoop学习总结合源码分析,网上收集整理
hadoop 2.9.0 mapred-site.xml 全部属性集,默认值及属性描述,其中关键属性已使用pdf注释
实战hadoop,源码,刘鹏,开启通向云计算的捷径
Hadoop及源码 Hadoop及源码 Hadoop及源码 Hadoop及源码
java 关联hadoop源码 查看底层实现,mapReduce实现 HDFS实现
hadoop 框架下 mapreduce源码例子 wordcount ,eclipse下,hadoop 2.2 可以运行
hadoop 源码解析_yarn源码解析
hadoop简单开发例子源码(含jar包),适用于初学者!
hadoop2.9.x源码编译工具包(maven、protocolBuffer、findbugs)
hadoop2.7.3的源码包,hadoop关联源码的时候直接选择就可以查看源码。是自己通过mvn下载的源码之后压缩的。
win 7 64上编译 Hadoop 2.7.3 源码 的真实经历。
hadoop 2.7.2 的底层源码包。Welcome to Apache™ Hadoop®!