锁定老帖子 主题:求助erlang进程通信效率问题
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2008-02-15
我开发的消息转发server的功能: Aclient Bclient都连入Server,都并加入一个组C,Aclient向C组发一条消息,Server负责将消息发给C组内除了A外的其他Client,也就是B。开发没用OTP这些高级东西。SOCKET使用二进制,{active true},{packet 2}. 消息Server的设计: 有一个独立的进程负责监听client连入,当有新的client连入会为client申请独立的进程(client_recv process)负责接受这个client发来的消息,有一个独立的进程(group_manager process)维护分组和组的client成员列表,每个组(group process)有一个独立进程负责向组内的client转发消息。 当Aclient连入server,就会为A启动(client_recv process)负责接受A发来的消息并通过进程通信告诉给group_manager来建立一个组group。Bclient连入操作和A一样,都会加入同一个组。 当A发消息给SERVER要求把消息给组的其他成员时,client_recv通过进程通讯将消息给group进程,group进程便利组的成员列表将消息转发。 在测试性能时遇到的问题: 测试的时候A向SERVER发送100W的消息,用了8秒就发完了,并自动退出了,消息内容都一样,在内网做的测试。 而SERVER通过tcpdump监听,确实也是在8秒把所有消息都收完了,不在有来自A的消息进入,但是SERVER向B转发消息用了很长时间,超过10分钟,最后我结束进程了。 在程序内做了打印来判断程序的运行流程,发现SERVER一直执行从client_recv接受A的消息并给group来转发。 请问是我设计的有问题造成消息转发很慢吗?还是由于进程间通讯造成的?有什么办法可以调优? 谢谢各位帮助! 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2008-02-15
server代码,写的比较烂 请大家见谅
-module(emssserver). -export([init/0]). -define(TCP_OPTIONS,[binary,{packet, 2}, {active, true}, {reuseaddr, true}]). -define(PORT,7000). -define(Debug,"YES"). -ifdef(Debug). -define(DEBUG(Fmt, Args), io:format(Fmt, Args)). -else. -define(DEBUG(Fmt, Args), no_debug). -endif. -record(group,{name,gpid}). -record(client, {name,pid,socket}). init()-> {ok, Listen} = gen_tcp:listen(?PORT, ?TCP_OPTIONS), register(accept_connection, spawn(fun() -> accept_connection(Listen) end)), register(group_manage, spawn(fun() -> group_manage([]) end)). accept_connection(Listen) ->%%监听进程 {ok, Socket} = gen_tcp:accept(Listen), Pid = spawn(fun() -> client_recv(Socket,"","") end), gen_tcp:controlling_process(Socket,Pid), accept_connection(Listen). client_recv(Socket,ClientName,GroupPid)->%%client接受消息进程 receive {tcp, Socket, Bin} -> ?DEBUG("Server recv Data\r\n",""), case binary_to_term(Bin) of {join,JoinName,ClientNameTmp}->%%client join 加入组 ?DEBUG("client_recv Join:~s ~s\r\n",[JoinName,ClientNameTmp]), G=#group{name=JoinName}, group_manage ! {join,G,self()},%%get group pid receive {grouppid,GPid}->%%recv group pid取得组的Pid ?DEBUG("Client_recv grouppid ~w ~s\r\n",[GPid,ClientNameTmp]), GPid!{join,Socket,ClientNameTmp,self()} %% send "join" msg to group end, client_recv(Socket,ClientNameTmp,GPid); {send,GroupName,Msg}->%%群发消息 ?DEBUG("client_recv Send:~s ~s\r\n",[GroupName,Msg]), GroupPid!{send,ClientName,Msg},%%给组 消息 client_recv(Socket,ClientName,GroupPid) end; {tcp_closed, Socket} -> ?DEBUG("client_recv close\r\n",[]) end. group_manage(Group)->%%维护所有组和组内成员 ?DEBUG("group_manage start\r\n",[]), receive {join,JoinGroup,ClientPid}-> ?DEBUG("group_manage Join:~s\r\n",[JoinGroup#group.name]), case lists:keysearch(JoinGroup#group.name, #group.name, Group) of false -> Pid=spawn(fun() -> group([]) end), ?DEBUG("group_manage group pid~w\r\n",[Pid]), ClientPid!{grouppid,Pid},%%return client_recv group pid JoinGroupp = #group{name=JoinGroup#group.name,gpid=Pid}, Gs = [JoinGroupp|Group], ?DEBUG("group_manage group:~w\r\n",[Gs]), group_manage(Gs); {value,SingleGroup} -> ?DEBUG("group_manage group is set ~s\r\n",[SingleGroup#group.name]), ClientPid!{grouppid,SingleGroup#group.gpid}, group_manage(Group) end end. group(Client)->%%负责组的消息群发的进程 ?DEBUG("group \r\n",[]), receive {join,Socket,ClientName,CPid} -> ?DEBUG("group ClientName ~s\r\n",[ClientName]), ?DEBUG("group Client set ~w\r\n",[Client]), case lists:keysearch(ClientName,#client.name,Client) of false-> ?DEBUG("group new client\r\n",[]), ClientNew = #client{name=ClientName,pid=CPid,socket=Socket}, ClientTmp = [ClientNew|Client], group(ClientTmp); {value,SingleClient}-> ?DEBUG("group client is set ~s\r\n",[SingleClient#client.name]), group(Client) end; {send,ClientName,Msg}-> ?DEBUG("group recv client send ~s\r\n",[Msg]), lists:foreach(fun(T)->send(ClientName,T,Msg) end,Client), group(Client) end, ok. send(FClientName,TClientName,Msg)-> if FClientName /= TClientName#client.name -> ?DEBUG("send f:~s t:~s ~s\r\n",[FClientName,TClientName#client.name,Msg]), M = {FClientName,Msg}, gen_tcp:send(TClientName#client.socket,term_to_binary(M)); true-> ?DEBUG("send from eq to\r\n",[]) end. client端代码就是简单的迭代 gen_tcp:send |
|
返回顶楼 | |
发表时间:2008-02-15
你的send不是很高效,gen_tcp:send是会阻塞的,如果客户很慢,缓冲区已满就会阻塞,把TCP_OPTIONS加上{delay_send, true}测试一下有没有提高。
|
|
返回顶楼 | |
发表时间:2008-02-15
谢谢楼上 我试试这个参数。
|
|
返回顶楼 | |
发表时间:2008-02-15
加上这个参数也没明显变化,我觉得不是程序有逻辑错误,就是某些地方有问题,Aclient发100W的消息给SERVER特快,但是server给Bclient非常慢....估计1小时也发不完,慢的比其他脚本语言写的程序都慢了...即使Aclient和Bclient在同一机器上也一样,网络问题应该没有了。
|
|
返回顶楼 | |
发表时间:2008-02-15
该不会是B客户根本没有接收吧?如果是在linux环境下,建议strace客户看看
|
|
返回顶楼 | |
发表时间:2008-02-15
我开始也怀疑B没收到,但是BClient打印接受消息是有输出的,而且tcpdump查看也有SERVER的消息发过来。
erlang-gtalk上有人建议用netstat -nt看send和recv的值,我看server在发送时会突然出现send到2w左右,然后瞬间就0了,刷新多次偶尔会有几十Send,Bclient端多次刷偶尔会有recv几十的情况....奇怪。 我想是不是应该写个简单的 C->S->C的代码对比下是不是代码问题 |
|
返回顶楼 | |
发表时间:2008-02-15
观察一下运行时的CPU占用情况吧,另外使用strace,判断是瓶颈出现在IO还是CPU上,出现在任意一个上面都可能是编写逻辑不当造成的。
|
|
返回顶楼 | |
发表时间:2008-02-16
server运行时cpu占用99%,内存很少。
在erl的控制台用i()看进程。 Heapstack Reds Msgs <0.40.0>erlang:apply/2 832040 1194744 2616 lists:foreach/2 442992 是不是erlang的进程消息已经满了?同时不能处理这么多,我看都是经过foreach后在进行send。 |
|
返回顶楼 | |
发表时间:2008-02-16
既然瓶颈在CPU上,还是使用fprof分析一下吧,测试可以减少到100-1000次
最容易出现的问题是receive匹配顺序,或者是没有匹配到,建议加一个任意匹配检查一下是否有没有匹配的消息 |
|
返回顶楼 | |