`
cocoIT
  • 浏览: 48504 次
  • 性别: Icon_minigender_1
  • 来自: 福建
文章分类
社区版块
存档分类
最新评论

Hadoop MapReduce操作MySQL

 
阅读更多
"不知你是否想过,计算一下在城市中的每台电脑里的文件数加起来的共数有多少?似乎是一个非常不可思议的问题,将会是一个非常庞大的数字,如果用计算机去远程挨个统计一遍数以万计的计算机,首先要去扫描磁盘,然后再去做统计,最后把结果累加,很难在容许的时间范围内得出一个结果。

但如果在每台机器上有一个程序让他去计算(统计),在分组里合并计算结果,最后返回给中心服务器,貌似这样能在一定的短时间内得出结果。并非是我那么无聊的要去统计全市计算机中的文件数量,只是做了一个比方,我想说利用分布式并行计算可以更快的得到超大型数据的计算结果。 目前中国 很多互联网公司 已经采用了MapReduce框架便捷的去实现这样超大型数据的计算,例如:百度、淘宝、中国雅虎、金山 这样的互联网企业都在使用MapReduce,将计算的任务交给每台计算机,进行并行计算提高计算的能力。"

在今天越来越多的人在讨论云计算相关的话题,越来越多的人知道云计算是有很多的计算机组成,但人们不会是关心云计算中的运用的具体技术,但是对于今天的程序员和工程师们来说会慢慢的会接触到云计算相关的需求,所以自然的就会接触到云计算相关的技术,云计算技术中包含分布式并行计算,分布式运算模型就是能够把任务分布到多台服务器上面执行,最后把任务归并回来获得计算的结果,而 MapReduce 就是一个为并行处理大量数据而设计的编程模型,它将工作划分为一个独立任务组成的集合。它是一种并行编程,现有的产品包括,如: Google 的 BigTable、Hadoop 、HBase 都用到了MapReduce的计算框架。

Hadoop项目中Mapreduce的分布式计算框架需要依托于Hadoop的集群环境,在集群环境中 MapReduce 程序用于以并行方式计算大量数据,并且可以分配计算机之间的计算负荷,带来很高的运算效率,当计算时需要输入一组键/值对,生成一组输出键/值对。计算涉及的两个基本操作:Map 和 Reduce。用户编写的 Map 操作需要输入并生成一组中间键/值对。MapReduce 库将所有与同一中间键相关联的中间值组合到一起,并且将它们传递给 Reduce 功能。Hadoop 提供了MapReduce这种方法来实现并行计算程序设计的框架,当客户端发出MapReduce计算的请求转发到 Hadoop NameNode。然后Hadoop NameNode 负责具体的操作,它将启动大量 Map 和 Reduce 进程 执行具体的计算任务。当 MapReduce 完成操作之后,主节点 NameNode 将输出值返回到服务器并交付客户端。

在Hadoop集群环境的节点状态被分类为3类:高利用、平均利用 和未充分利用。根据每个节点的利用率,将负载在节点之间转移以平衡集群。首选需要获取邻近节点详细信息:1. 当 DataNode 的负载增加到阈值级别时,它将向该 NameNode 发送一个请求。2. NameNode 获得特定 DataNode 最邻近节点的负载级别信息。3. NameNode 比较负载,然后将有关最空闲相邻节点的详细信息发送到特定的 DataNode。其次DataNodes 开始工作:1. 每个 DataNode 将自己负载量与其最近节点的负载量之和进行比较。2. 如果 DataNode 的负载级别大于其邻近节点,将随机选择那么负载目标节点(直接相邻的节点及其他节点) 。3. 然后将节点请求发送到目标节点。最后接收请求:1. 每个节点将维护一个缓冲区接收负载请求。2. 消息传递接口(MPI)管理此缓冲区。3. 主线程会侦听缓冲队列,并服务其接收的请求。4. 节点进入负载平衡执行阶段。

具体到MapReduce框架 读/写数据库,有2个主要的程序分别是 DBInputFormat和DBOutputFormat,DBInputFormat 对应的是SQL语句select,而DBOutputFormat 对应的是 Inster/update,使用DBInputFormat和DBOutputForma时候需要实现InputFormat这个抽象类,这个抽象类含有getSplits()和createRecordReader()抽象方法,在DBInputFormat类中由 protected String getCountQuery() 方法传入结果集的个数,getSplits()方法再确定输入的切分原则,利用SQL中的 LIMIT 和 OFFSET 进行切分获得数据集的范围 ,请参考DBInputFormat源码中public InputSplit[] getSplits(JobConf job, int chunks) throws IOException的方法,在DBInputFormat源码中createRecordReader()则可以按一定格式读取相应数据。
我的废话:好比开启多线程对数据库查询,你先查询出数据结果集的个数,再根据你分配的LIMIT 和 OFFSET 策略分配给每个线程对查询数据库,最后合并在一个结果集里面。

注意:当集群中有大量的节点将会导致mapper的数目上升,此时读取数据库会给数据库造成很大的压力,需要考虑数据库的负载能。

简单的讲述一下 我利用MapReduce对MySQL和分布式文件系统HDFS的操作过程,MapReduce客户端先从MySQL中读取数据写在HDFS上,在示例程序中调用DBAccessReader.java就可以实现运行结果,第二个示例DBOutputFormat.java程序再从MySQL中读取数据,再次写入MySQL中,这样做的目的是想表达:
1.可以通过MapReduce可以对结构化和非结构化的数据进行操作,
2.可以通过MapReduce可以对结构化和结构化的数据进行操作,
3.至于通过MapReduce对非结构化和非结构化的示例就不提供了,网上一大把都是。

JobTracker是一个master服务, JobTracker负责调度job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。
TaskTracker是运行于多个节点上的slaver服务。TaskTracker则负责直接执行每一个task,TaskTracker需要运行在DataNode上。
Client端通过JobClient类提交到JobTracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask)并将它们分发到各个TaskTracker服务中去执行。

整个环境的架构如图所示:


以下是我在项目组测试环境中建立的机器清单如下:
1.NameNode / HDFS 192.168.20.212 (主节点/linux)
2.DataNode / HDFS 192.168.20.214/192.168.20.215 (从节点/linux)
3.MySQL 192.168.20.237 (单机/linux)
4.MapRereduce Client 192.168.20.240 (Eclipse/windows xp)

注意:将代码示例打成jar,将这个jar文件和mysql的jdbc驱动上传到所有hadoop节点的lib目录下,另外本示例中的blog.sql是建表语句。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics