`
QING____
  • 浏览: 2232013 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Mongodb中Mapreduce特性与原理

 
阅读更多

    对于每个从事数据技术的人才而言,mapreduce都不陌生,简单而言就是一种大数据计算、分类、分析的一个编程模式,目前很多大数据存储平台都支持mapreduce,比如hadoop、hbase等等,也有很多采用了类似于mapreduce的算法的其他数据计算平台,关于mapreduce的原理不再赘言,我们直接来了解mongodb中如何使用mapreduce。

 

    Mongodb中提供了2种对大数据计算的方式:aggregate和mapreduce,严格意义上说mapreduce也是一种“aggregation”算法,只是这两种方式使用起来差距甚远,设计理念也不同,所以我们分开来介绍,aggregate请参考【aggreate】,本文重点介绍mapreduce。

 

    mapreduce有2个阶段:map和reduce;mapper处理每个document,然后emits一个或者多个objects,object为key-value对;reducer将map操作的结果进行联合操作(combine)。此外mapreduce还可以有一个finalize阶段,这是可选的,它可以调整reducer计算的结果。在进行mapreduce之前,mongodb支持使用query来筛选文档,也支持sort排序和limit。

    mapreduce使用javascript语法编写,其内部也是基于javascript V8引擎解析并执行,javascript语言的灵活性也让mapreduce可以处理更加复杂的业务场景;当然这相对于aggreation pipleine而言,意味着需要书写大量的脚本,而且调试也将更加困难。(调试可以基于javascript调试,成功后再嵌入到mongodb中)我们来展示一个例子:

    我们想统计每个品类下商品的总量,SQL语义为:

SELECT COUNT(1) FROM products group by category_id

 

     原始数据格式:

{_id:1,"categoryId" : 1 , "productId":100010,"amount" : 200}
{_id:1,"categoryId" : 1, "productId": 100011,"amount" : 300}

 

    mapreducey语句范例:

db.products.mapReduce(
	function(){
    	emit(this.categoryId,1);
	},
	function(key,values){
		return Array.sum(values);
	},
	{
		out:"result001",
	}

)

 

    JAVA代码示例:

String mapper = "function() {" +
        "emit(this.categoryId,1);" +
        "}";
String reducer = "function(key,values){" +
        "return Array.sum(values);" +
        "}";
collection.mapReduce(mapper, reducer)
        .databaseName("test")   //结果保存在”test“数据库中,默认为当前数据库
        .collectionName("result") //指定结果保存的collection,默认为inline模式,不保存,直接通过cursor输出
        .action(MapReduceAction.REPLACE) //结果保存方式,如果”result“表存在,则替换。
        .nonAtomic(false) //是否为”非原子性“
        .sharded(false) //结果collection是否为sharded
        .maxTime(180,TimeUnit.SECONDS) //mapreduce执行的最大时间
        .iterator() //触发执行,这句话千万别忘了,否则不会执行
        .close(); //我们不需要cursor,则直接关闭
//此后即可通过result collection获取数据统计的结果

 

    统计结果示例:

{ "_id" : 1000, "value" : 100 }
{ "_id" : 1001, "value" : 100 }
{ "_id" : 1002, "value" : 100 }
{ "_id" : 1003, "value" : 100 }
{ "_id" : 1004, "value" : 100 }

     

    通过此例,我们简单的了解了mapreduce的编程方式,其实也是很简单的,如果你从事过javascript开发,你会发现书写mapper和reducer脚本也并不困难,况且javascript内置的大量函数库也可以提供便利。

    

    一个mapreduce就像aggregation中的一次group,如果你的统计结果需要多次group,那么可能需要多个mapreduce;这和aggregate pipleline相比就没有那么简便了。但是mapreduce可以写的非常复杂,处理一些比较棘手的业务性逻辑,这在aggregate中反而不行,因为aggregate支持的“计算表达式”比较有限,和javascript的方法库相比相差太远。mapreduce可以读取sharding collection中的数据,而且可以将output结果输出到sharding collection中,但是aggregate不支持这个特性。

 

一、编程语法

    在mongodb中,mapreduce除了包含mapper和reducer之外,还包含其他的一些选项,不过整体遵循mapreduce的规则:

    1、map:javascript方法,此方法中可以使用emit(key,value),一次map调用中允许返回调用多次emit(也可以不调用),它不需要返回值;其中key用来分组,value将来会被传递给reducer用于“聚合计算”。每条document都会调用一次map方法。

    mapper中输入的是当前document,可以通过this.<filedName>来获取字段的值。mapper应该是封闭的,它不能访问外部资源,比如collection、database,不能修改外部的值,但允许访问“scope”中的变量。emit的值不能大于16M,即document最大的尺寸,否则mongodb将会抛出错误。

function() {
    this.items.forEach(function(item) {emit(item.sku,1);});//多次emit
}

 

    2、reduce:javascript方法,此方法接收key和values两个参数,经过mapper处理和“归并之后”,一个key将会对应一组values(分组,key:values),此values将会在reduce中进行“聚合计算”,比如:sum、平均数、数据分拣等等。

    reducer和mapper一样是封闭的,它内部不允许访问database、collection等外部资源,不能修改外部值,但可以访问“scope”中的变量;如果一个key只有一个value,那么mongodb就不会调用reduce方法。可能一个key对应的values条数很多,将会调用多次reduce,即前一次reduce的结果可能被包含在values中再次传递给reduce方法,这也要求,reduce返回的结果需要value的结构保持一致。同样,reduce返回的数据尺寸不能大于8M(document最大尺寸的一半,因为reduce的结果可能会作为input再次reduce)。

//mapper
function() {
    emit(this.categoryId,{'count' : 1});
}
//reducer
function(key,values) {
    var current = {'count' : 0};
    values.forEach(function(item) { current.count += item.count;});
    return current;
}

 

    此外reduce内的算法需要是幂等的,且与输入values的顺序无关关的,因为即使相同的input文档,也无法保证map-reduce的每个过程都是逐字节相同的,但应该确保计算的结果是一致的。

    3、out:document结构,包含一些配置选项;用于指定reduce的结果最终如何保存。可以将结果以inline的方式直接输出(cursor),或者写入一个collection中。

out : {
    <action> : <collectionName>
    [,db:<dbName>]
    [,sharded:<boolean>]
    [,nonAtomic:<boolean>]
}

 

    out方式默认为inline,即不保存数据,而是返回一个cursor,客户端直接读取数据即可。

    <action>表示如果保存结果的<collection>已经存在时,将如何处理:1)replace:替换,替换原collection中的内容;先将数据保存在临时collection,此后rename,再将旧collection删除 2)merge:将结果与原有内容合并,如果原有文档中持有相同的key(即_id字段),则直接覆盖原值  3)reduce:将结果与原有内容合并,如果原有稳重中有相同的key,则将新值、旧值合并后再次应用reduce方法,并将得到的值覆盖原值(对于“用户留存”、“数据增量统计”非常有用)。

    db:结果数据保存在哪个database中,默认为当前db;开发者可能为了进一步使用数据,将统计结果统一放在单独的database中。

    sharded:输出结果的collection将使用sharding模式,使用_id作为shard key;不过首先需要开发者对<collection>所在的database开启sharding,否则将无法执行。

    nonAtomic:“非原子性”,仅对“merge”和“replace”有效,控制output collection,默认为false,即“原子性”;即mapreduce在输出阶段将会对output collection所在的数据库加锁,直到输出结束,可能会性能会有影响;如果为true,则不会对db加锁,其他客户端可以读取到output collection的中间状态数据。我们通常将ouput collection单独放在一个db中,和application数据分离开,而且nonAtomic为false,我们也不希望用户读到“中间状态数据”。

 

    可以通过指定“out:{inline : 1}”将输出结果保存在内存中,并返回一个cursor,客户端可以直接读取即可。

 

    4、query:筛选文档,只需要将符合条件的documents传递给mapper。

    5、sort:对刷选之后的文档排序,然后才传递给mapper。如果根据map的key进行排序,则可以减少reduce的操作次数。排序必须能够使用index。

    6、limit:限定输入到map的文档条数。

    7、finalize:终结操作,在输出之前调整reduce的结果。它和map、reduce一样,也是一个javascript方法,接收key和value,其中value为reduce输出结果,finalize方法中可以修改value的值作为最终的输出结果:

function(key,value) {
    var final = {count : 0,key:""};
    final.key = key;
    return final;
}

 

    8、scope:document结构,保存一些global级别的变量值,它们可以在map、reduce、finalize中被访问。

//java代码
Document scope = new Document("off",0.5);
String finalize = "function(key,value) {" +
                "return value.count * off;" +
                "}";

 

    9、jsMode:可选值为true或者false;表示是否将map执行的中间结果数据有javascript对象转换成BSON对象,默认为false。false表示,在mapper中emit最终输出的是javascript对象,因为是javascript引擎处理的,不过mapper 可能产生大量的数据,这些数据将会被保存在临时的存储中(collection),所以需要将javascript对象转换成BSON;在reduce阶段,这些BSON结果在被转换成javascript对象,传递给reduce方法,转换意味着性能消耗和慢速,它解决的问题就是“临时存储”以适应较大数据集的数据分析。如果为true,将不会进行类型转换,数据被暂存在内存中,reduce阶段直接使用mapper的结果即可,但是key的个数不能超过50W个。在production环境中,此值建议为false。

 

    在map、reduce、finalize中,我们自始至终都不要尝试去改变key的值,这会导致错误。

 

二、基本原理

    1、过程与原理

    mongodb 2.4之后,可以多线程运行javascript操作,即允许多个mapreduce同时并行执行;不过在2.4之前,则是单线程运行,即每个mongod上同时只能有一个mapreduce运行,所以并发能力很弱,建议开发者升级到最新的版本。

    mapreuce可以从sharding collection中读取数据,也可以将结果写入到sharding collection中;但是aggregate则不能将结果写入到sharding collection中。

 

    1)mongos接收到mapreduce的操作请求后,根据query条件,将map-reduce任务发给持有数据的shards(sharding collection将会被分裂成多个chunks并分布在多个shards中,shard即为mongod节点)。

    2)每个shards都依次执行mapper和reducer,并将结果写入到本地的临时collection中,结果数据是根据_id(即reducer的key)正序排列。

    3)当所有的shards都reduce完成之后,将各自结果数据中_id的最大值和最小值(即min、max key)返回给mongos。

    4)mongos负责shuffle和partition,将所有shards反馈的min、max key进行汇总,并将整个key区间分成多个partitions,每个partition包含[min,max]区间,此后mongos将partiton信息封装在finalReduce指令中并发给每个shard,最终每个shard都会收到一个特定的partition的任务;partition不会重叠。

    5)此后每个shard将与其他所有的shards建立链接,根据partition信息,从min到max,遍历每个key。对于任何一个key,当前shard都将从其他shards获取此key的所有数据,然后后执行reduce和finalize方法,每个key可能会执行多次reduce,这取决于values的条数,但是finalize只会执行一次,最终将此key的finalize的结果通过本地方式写入sharding collection中。

    6)当所有的shards都处理完毕后,mongos将处理结果返回给客户端(inline)。

 

    在mapper时,mapper的结果首先放在内存中,当内存的数据量达到阀值会将执行一次reduce并写入临时文件。

                   ------------
                   |  mongos  |
                   ------------
                        |
                    mapreduce
                        |
------------                    ------------
|  shard   |                    |   shard  |
------------                    ------------
      |                               |
   map+redue                       map+reduce
      |                               |
------------                    ------------
|  _chunk  |                    |  _chunk  |
------------                    ------------
      |                               |
 [min1~max1]                     [min2~max2]
                        |
                  ------------
                  |  mongos  |
                  ------------
                        |
                    partitions
                        |  
 [min1'~max1']                  [min1'~max2']      
      |                               |
  finalReduce                     finalReduce    
      |                               |
------------                     ------------
|  _chunk  |      <--read-->     |  _chunk  |
------------                     ------------
      |                               |
  finalize                         finalize       -->write once for each key
      |                               |
    write                           write         -->locally write
                        |
                  ------------
                  | sharding |
                  ------------

 

    在mongodb2.0版本中,finalReduce步骤将有mongos执行,即所有的shards执行完mapreduce之后,由mongos负责读取所有shards的结果并在本地进行merge和sort,然后执行finalReduce和finalize过程,这中方式并不会带来太大问题,消耗内存也小,但是会增加mongos的负担,影响应用数据的并发能力。经过2.4版本改进后(上述),性能得到很大的提高。

    在mapreduce执行过程中,将会阻止balancer“迁移”chunks(sharding balancer机制);对于输出结果是sharding collection,在finalReduce期间,其chunks也不会被迁移,直到运行结束。主要是为了避免并发操作带来的问题。

 

    2、并发性

    我们都知道,mongodb中所有的读写操作都会加锁(意向锁),mapreduce也不例外。mapreduce涉及到mapper、reducer,中间过程还会将数据写入临时的collection中,最终将finalize数据写入output collection。read阶段将会使用读锁(读取chunks中的数据),每处理100条documents后重新获取锁(yields)。写入临时collectin使用写锁,这个不会涉及到锁的竞争,因为临时collection只对自己可见。在创建output collection时会对DB加写锁,如果output collection已经存在,且action为“replace”时,则会获取一个global级别的写锁,此时将会阻塞mongod上的所有操作(影响很大),主要是为了让数据结果为atomic;如果action为“merge”或者“reduce”,且“nonAtomic”为true是,只会在每次写入数据时才会获取写锁,这对性能几乎没有影响。

    如果你每次mapreduce的结果都会写入到一个新的collection,则建议将“nonAtomic”设置为true,可以避免这个全局锁的问题。参见源码【mongo/db/commons/mr.cpp】

 

    由上述原理可知,在mapper阶段为了压缩内存数据,在写入临时文件时也会执行一次reducer,在reduce阶段如果某个key对象的values较多,也会多次执行reducer;这也要求reducer返回结果必须和mapper emit的结果类型保持一致,同时保证reducer中的算法是幂等的,即多次运行结果应该一致,不依赖于reducer执行的次数和传递mapper结果的顺序。

 

三、其他

    1、mapreduce脚本是基于javascript,为了便于运行和维护,我们通常将这些脚本保存在db中,比如mysql或者mongodb中,在运行mapreduce是从db中获取即可,然后传递给mapreduce方法。

    2、一个mapreduce,只能执行产生一个“group”结果,如果你的数据分析中,需要多次group,那么可能需要将开发多个mapreduce,并将它们串联起来。(需要开发一个管理任务有向图的工具)。

 

    

 

分享到:
评论
1 楼 kyo472083100 2019-02-25  
写得很好,感谢楼主解释得很清晰

相关推荐

Global site tag (gtag.js) - Google Analytics