`
dcaoyuan
  • 浏览: 299496 次
社区版块
存档分类
最新评论

The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV)

阅读更多

Playing with Tim's Erlang Exercise is so much fun.

I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.

Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.

This come out my third solution, which matchs whole

{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } 

likeness using the pattern:

"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]

and then fetchs

[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])

as the matched key, with no need to split the chunk to lines.

But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.

On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec

# smp enabled:
$ erlc -smp tbray3.erl
$ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt
8900    : <<"2006/09/29/Dynamic-IDE">>
2000    : <<"2006/07/28/Open-Data">>
1300    : <<"2003/07/25/NotGaming">>
800     : <<"2003/10/16/Debbie">>
800     : <<"2003/09/18/NXML">>
800     : <<"2006/01/31/Data-Protection">>
700     : <<"2003/06/23/SamsPie">>
600     : <<"2006/09/11/Making-Markup">>
600     : <<"2003/02/04/Construction">>
600     : <<"2005/11/03/Cars-and-Office-Suites">>
Time:    4142.83 ms

real    0m4.483s
user    0m5.804s
sys     0m0.615s

# no-smp:
$ erlc tbray3.erl
$ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt

real    0m7.050s
user    0m6.183s
sys     0m0.644s

The smp enable result speedup about 57%

On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:

real    0m8.420s
user    0m11.637s
sys     0m0.452s

And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.

The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.

The code:

-module(tbray3).

-compile([native]).

-export([start/1]).
  
%% The best Bin Buffer Size is 4096 * 1 - 4096 * 5
-define(BUFFER_SIZE, (4096 * 5)). 

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),
 
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file(File, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
    end.

read_file(File, Collector) -> read_file_1(File, [], 0, Collector).
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof ->
            Collector ! {chunk_num, I},
            file:close(File);
        {ok, Bin} -> 
            {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)),
            spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []).
split_on_last_newline_1(List, Tail) ->
    case List of
        []         -> {lists:reverse(List), []};
        [$\n|Rest] -> {lists:reverse(Rest), Tail};
        [C|Rest]   -> split_on_last_newline_1(Rest, [C | Tail])
    end.

collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} -> 
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.
    
print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

scan_chunk(List) -> scan_chunk_1(List, dict:new()).
scan_chunk_1(List, Dict) ->
    case List of
        [] -> Dict;
        "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] ->
            case match_until_space_newline(Rest, []) of
                {Rest1, []} -> 
                    scan_chunk_1(Rest1, Dict);
                {Rest1, Word} -> 
                    Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]),
                    scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict))
            end;
        [_|Rest] -> scan_chunk_1(Rest, Dict)
    end.
    
match_until_space_newline(List, Word) ->
    case List of
        []     -> {[],   []};
        [10|_] -> {List, []};
        [$.|_] -> {List, []};
        [$ |_] -> {List, lists:reverse(Word)};
        [C|Rest] -> match_until_space_newline(Rest, [C | Word])
    end.

I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.

The code:

-module(tbray3_bin).

-compile([native]).

-export([start/1]).

-define(BUFFER_SIZE, (4096 * 10000)).

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    {ok, File} = file:open(FileName, [raw, binary]),    
    read_file(File, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.
    
collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].
          
read_file(File, Collector) -> read_file(File, <<>>, 0, Collector).            
read_file(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof -> 
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, Bin} -> 
            {Data, NextTail} = split_on_last_newline(Bin),
            spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            read_file(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(Bin) -> split_on_last_newline(Bin, size(Bin)).   
split_on_last_newline(Bin, Offset) ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ when Offset =< 0 -> 
            {Bin, <<>>};
        _ -> 
            split_on_last_newline(Bin, Offset - 1)
    end.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,_,_/binary>> ->
            match_until_space_newline(Bin, Offset + 1);
        _ -> 
            {<<>>, <<>>}
    end. 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics