`
langzhe
  • 浏览: 278918 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

两个周前挖了一个坑,现在跳了进去<riak mapreduce 分析>,顺便能实现分页功能了

    博客分类:
  • riak
阅读更多

我现在有一个这样的bucket

id    followed_id    followr_id 

1     lxw            jason      

2     jason          lxw

3     langxw         jason

4     jason          langxw

要得到这样一组数据[1](统计粉丝数,当然粉丝数可以预先计算,不必每次都现查),当时看到代码里有现成的接口就直接调用了,当时还感觉这接口有问题,后面可能会用麻烦。果然现在查询数据总是超时。所以说无论如何不要想着偷懒,偷懒的结果只会让自己花更多的时间来弥补。

[1]、

 {name,count}

 {lxw, 1}

 {jason, 2}

 {langxw, 1

 

后来,自己写个查吧。首先想到用mapreduce 计算得出

[2]map是这样写的,只是为了得到一个list({FollowedId, 1})

 

map(Record, undefined, {sub_rank}) ->
    ?DEBUG("~p:map sub_rank ~p ~n", [?MODULE, ?LINE]),
    case riak_kv_util:is_x_deleted(Record) of
        false ->
            {struct, List} = mochijson:decode(riak_object:get_value(Record)),
            FollowedId = get_value(List, "followed_id"), 
            [{FollowedId, 1}];
        _ ->[]
    end; 

 接着写了这样一个reduce

 

reduce[1]: 把相同的followedId的value相加

 

reduce(Records,  {sub_rank}) ->
   FSum = fun({FollowedId, Count}, Acc) ->
                Value = proplists:get_value(FollowedId, Acc, 0),
                [{FollowedId, Value+Count}|proplists:delete(FollowedId, Acc)]
           end,
    lists:foldr(FSum, [], Records); 

 在3000个obj的情况下跑了一边,没问题大公告成。

 

因为还涉及到取出根据count排序前50条。

所以需要再添加reduce[2]

reduce[2]:取出前50条记录

 

reduce(Records,  {sub_rank, Max}) when is_integer(Max) ->
    ?DEBUG("~p:reduce sub_rank ~p Max=~p, Records=~p~n", [?MODULE, ?LINE, Max, Records]),
    lists:sublist(lists:reverse(lists:keysort(2,Records)), Max);

 

 

本以为这样就完事了,当20万个obj情况下,这个mapreduce照样查询不出数据来,一直提示timeout,设置10分钟,15分钟都是timeout。

    Query代码:

Query=[{map,{modfun,trend_riak,map},{sub_rank},false},   
       {reduce,{modfun,trend_riak,reduce},{sub_rank},false},   
       {reduce,{modfun,trend_riak,reduce},{sub_rank, 50},true}],  

 

 1、分析慢的原因

    只用riak_pb_socket:mapred/3执行map,不执行reduce

Query=[{map,{modfun,trend_riak,map},{sub_rank},false},
   数据能查询出来大约花了4s。

 

   我这时只是感觉奇怪,心想为什么返回数据多了还快了。reduce使其返回数据少了,为什么却慢了。

   这下只好仔细看看reduce代码了。

    定位到了proplist操作上。

    

Value = proplists:get_value(FollowedId, Acc, 0),
[{FollowedId, Value+Count}|proplists:delete(FollowedId, Acc)]
    测试了一下,发现两个操作确实比较耗时。

 

 

    这时我想到了用dict才实现

    用dict之前是这样实现的:

    

 

这操作是在shell计算,20万条数据花费 141s
    FSum = fun({FollowedId, Count}, Acc) ->
                Value = proplists:get_value(FollowedId, Acc, 0),
                [{FollowedId, Value+Count}|proplists:delete(FollowedId, Acc)]
           end,
    lists:foldr(FSum, [], Records); 
 

 

     用dict首先想到这样实现:

    

这操作是在shell计算,20万条数据花费 12s
    FSum = fun({FollowedId, Count}, Acc) ->
                case  dict:is_key(FollowedId, Acc) of
                    true ->
                        Value = dict:fetch(FollowedId, Acc),
                        dict:store(FollowedId, Count+Value, Acc);
                    false ->
                        dict:store(FollowedId, Count, Acc)
                end
            end,
    lists:foldr(FSum, dict:new(), Records);    
     缺点:用dict时如果直接用dict:fetch/2函数时,如果K不存在会抛出一个异常错误,这也是我平常不用dict的原因懒的每次都调用dict:is_key/2判断。这里判断一次,取出一次,存储一次总共判断了三或两次。
     为了减少操作就用用了dict:update_counter/3      
   
    这操作是在shell计算,20万条数据花费 10s
    FSum2 = fun({FollowedId, Count}, Acc) ->
                case  dict:is_key(FollowedId, Acc) of
                    true ->
                        dict:update_counter(FollowedId, Count, Acc);
                    false ->
                        dict:store(FollowedId, Count, Acc)
                end
            end,
    lists:foldr(FSum2, dict:new(), Records);
 
 其实可以更简单,省去了判断。
这操作是在shell计算,20万条数据花费 3s
    FSum3 = fun({FollowedId, Count}, Acc) ->
                dict:update_counter(FollowedId, Count, Acc)
            end,
    lists:foldr(FSum3, dict:new(), Records); 

Add Increment to the value associated with Key and store this value. If Key is not present in the dictionary then Incrementwill be stored as the first value.

 

   这时看上去已经省去了不少时间了由141s降到了3s。 
   当把这段代码放到reduce代码里后发现查询依然比较慢,欣慰的是数据查询出来了。但总体花了101s,显然没达到预期结果。
   根据map-reduce原理特点分析了一下reduce代码。猜测可能很多被reduce处理过的数据会多次reduce,具体reduce多少次,目前我无法判断。那只好把reduce结果改成返回一个结果添加个分之判断一下,被reduce过的数据不在参与reduce的遍历处理,还能统计在一起
   实现方法:
     
 返回一个元素的list,[dict()]                      
reduce(Records,  {sub_rank}) ->
    FSum3 = fun({FollowedId, Count}, Acc) ->
                   dict:update_counter(FollowedId, Count, Acc);
               (Dict, Acc) ->
                   dict:merge(fun(_K, V, V1) ->V+V1 end, Dict, Acc)
            end,
    Return = lists:foldr(FSum3, dict:new(), Records),
    [Return];
顺便也得重写第二个reduce
% 取出前Max个
reduce([Records],  {sub_rank, Max}) when is_integer(Max) ->
    lists:sublist(lists:reverse(lists:keysort(2,dict:to_list(Records))), Max); 

   这下终于算是OK ,

  

  Query代码:执行花费12s

Query=[{map,{modfun,trend_riak,map},{sub_rank},false},   
       {reduce,{modfun,trend_riak,reduce},{sub_rank},false},   
       {reduce,{modfun,trend_riak,reduce},{sub_rank, 50},true}],  

  根据这个最后的思路加上第二个reduce也能实现分页功能了

 

 

 

 

 

分享到:
评论

相关推荐

    riak-java-client:Java的Riak客户端

    Riak Java客户端 Riak Java客户端支持与 (开放源代码,分布式数据库)进行通信,该数据库专注于高可用性,水平可伸缩性和可预测的延迟... &lt; groupId&gt;com.basho.riak&lt;/ groupId&gt; &lt; artifactId&gt;riak-client&lt;/ arti

    riak-session-manager.zip

    riak-session-manager 是使用 Riak 来存储 Tomcat session 信息的项目。 配置方法:   &lt;?xml version="1.0" encoding="UTF-8"?&gt;&lt;Context&gt; &lt;Manager className=...

    vclock:矢量时钟的Clojure实现,大致从Riak Core移植

    VClock是矢量时钟的一种实现,大致是从移植过来的。 伪像 VClock工件被 。 如果您使用的是Maven,请将以下存储库定义添加到pom.xml : &lt; repository&gt; &lt; id&gt;clojars.org&lt;/ id&gt; &lt; url&gt;http://clojars.org/repo&lt;/ ...

    riak-ruby-client, 用于 ruby的Riak客户端.zip

    riak-ruby-client, 用于 ruby的Riak客户端 ) 客户端( Riak客户机)riak-client 是一个富 ruby 客户端/工具箱,分布在,数据库中,包含典型操作的基本包装。在 http://basho.github.io/riak-ruby-client/ 可以使用详尽...

    Laravel开发-laravel-riak

    Laravel开发-laravel-riak RIAK连接、缓存和会话的RIAK提供程序

    Riak.Driver.Net:riak c#客户端

    riak.driver.net Riak.Driver.Net &lt;? xml version = " 1.0 " encoding = " utf-8 " ?... &lt; riak&gt; &lt; client&gt; &lt; endpoint name = " riak1 " socketBufferSize = " 8192 " messageBufferSize = " 8192

    Riak 学习文档

    学习使用Riak

    riak_pb:Riak协议缓冲区消息

    Riak协议缓冲区消息 master : develop : 该存储库包含的基于协议缓冲区的接口的消息定义,以及消息类型的各种... &lt;length&gt; &lt;msg&gt; &lt;pbmsg&gt; length是msg_code的长度(1个字节)加上以网络顺序(大端)编码的消息长度

    Laravel开发-laravel-riak-auth

    Laravel开发-laravel-riak-auth Laravel的RIAK身份验证提供程序

    riak-cli:Riak 命令行客户端

    这是一个 Riak 终端查询工具,提供基本的 POST/PUT/DELETE/GET 等方法。安装 $ brew install node$ git clone https://github.com/tim-tang/riak-cli $ cd riak-cli && npm install将RIAK_CLI_HOME和 bin 目录导出...

    Python-Riak是以Erlang编写的一个高度可扩展的分布式数据存储

    Riak是以 Erlang 编写的一个高度可扩展的分布式数据存储,Riak的实现是基于Amazon的Dynamo论文,Riak的设计目标之一就是高可用。Riak支持多节点构建的系统,每次读写请求不需要集群内所有节点参与也能胜任。、

    riak-manage:管理Riak群集的工具

    riak管理工具集是一个管理Riak KV集群的项目。 是一个开放源代码的分布式数据库,专注于高可用性,水平可伸缩性和可预测的延迟。 关于此Beta项目的说明: 此软件不应在生产环境中使用。 该软件不能与Riak 1.4.x...

    liza-riak:liza的riak bucket实现(KV抽象)

    支持合并功能(每个存储桶指定),序列化/反序列化(开箱即用fressian / snappy实现的船舶),指标(使用yammer指标)和riak的CRDT计数器设计/目标liza-riak (不同于liza本身)本来就很容易-该库仅支持某些配置。...

    riak-client:Perl 波纹客户端

    # normal modemy $client = Riak::Client-&gt;new( host =&gt; '127.0.0.1', port =&gt; 8087, r =&gt; 2, w =&gt; 2, dw =&gt; 1, connection_timeout =&gt; 5, read_timeout =&gt; 5, write_timeout =&gt; 5, no_auto_connect =&gt; 0,);...

    server_monitoring_riak:使用Riak作为后端的服务器监视

    使用Riak作为后端的服务器监视 该项目是我的学士学位工作的一部分: “ NoSQL数据库和应用程序的比较分析” 米兰比可卡大学 关联者:安德烈·毛里诺(Andrea Maurino) 联合主持人:Blerina Spahiu 讲解 先决条件 ...

    riak-Erlang.rar

    linux riak erlang,otp_src_20.3

    riak:Riak是Basho Technologies的去中心化数据存储

    欢迎来到Riak。 概述 Riak是分布式,分散式数据存储系统。 在Wiki中,您将找到设置和使用Riak的“快速入门”指导。 有关更多信息,请浏览以下文件: 自述文件:此文件 许可证:Riak的发布许可证 doc / admin.org...

    riak-java-client, Java的Riak客户端.zip

    riak-java-client, Java的Riak客户端 Java客户端客户端 客户端支持与 通信,它是一个开源的分布式数据库,专注于高可用性。水平可伸缩性和可以预测的可以预测的实时延时。 Riak和这段代码都由 Basho维护。Java客户端...

    presto-riak:Riak数据库的presto连接器

    $ mv target/presto-riak-&lt;version&gt;.jar path/to/presto/plugin/presto-riak 配置 使用pb端口。 $ cat riak.properies connector.name=riak riak.pb.host=localhost:8087 riak.erlang.node=riak@127.0.0.1 presto....

    fakeriak:用于测试和没有 Riak 的机器的内存中 Ruby Riak 驱动程序

    特征支持以下 Riak 功能: 服务器信息基本数据对象查找2.0 之前的计数器Bucket/Bucket Type 道具列出键/桶使用 Javascript 映射/减少二级索引搜索索引/模式CRDT 以下 Riak 功能目前尚未实现: 使用 Solr 搜索查询...

Global site tag (gtag.js) - Google Analytics