论坛首页 综合技术论坛

求助erlang进程通信效率问题

浏览 15512 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2008-02-15  
大家好!小弟最近在学习erlang,熟悉代码做了一个消息转发的server,但是在测试性能的时候遇到一个问题,请教各位,是哪里的问题,谢谢!

我开发的消息转发server的功能:
Aclient Bclient都连入Server,都并加入一个组C,Aclient向C组发一条消息,Server负责将消息发给C组内除了A外的其他Client,也就是B。开发没用OTP这些高级东西。SOCKET使用二进制,{active true},{packet 2}.

消息Server的设计:
有一个独立的进程负责监听client连入,当有新的client连入会为client申请独立的进程(client_recv process)负责接受这个client发来的消息,有一个独立的进程(group_manager process)维护分组和组的client成员列表,每个组(group process)有一个独立进程负责向组内的client转发消息。

当Aclient连入server,就会为A启动(client_recv process)负责接受A发来的消息并通过进程通信告诉给group_manager来建立一个组group。Bclient连入操作和A一样,都会加入同一个组。
当A发消息给SERVER要求把消息给组的其他成员时,client_recv通过进程通讯将消息给group进程,group进程便利组的成员列表将消息转发。

在测试性能时遇到的问题:
测试的时候A向SERVER发送100W的消息,用了8秒就发完了,并自动退出了,消息内容都一样,在内网做的测试。
而SERVER通过tcpdump监听,确实也是在8秒把所有消息都收完了,不在有来自A的消息进入,但是SERVER向B转发消息用了很长时间,超过10分钟,最后我结束进程了。

在程序内做了打印来判断程序的运行流程,发现SERVER一直执行从client_recv接受A的消息并给group来转发。

请问是我设计的有问题造成消息转发很慢吗?还是由于进程间通讯造成的?有什么办法可以调优?

谢谢各位帮助!
   发表时间:2008-02-15  
server代码,写的比较烂 请大家见谅

-module(emssserver).
-export([init/0]).
-define(TCP_OPTIONS,[binary,{packet, 2}, {active, true}, {reuseaddr, true}]).
-define(PORT,7000).
-define(Debug,"YES").

-ifdef(Debug).
-define(DEBUG(Fmt, Args), io:format(Fmt, Args)).
-else.
-define(DEBUG(Fmt, Args), no_debug).
-endif.
-record(group,{name,gpid}).
-record(client, {name,pid,socket}).

init()->
	{ok, Listen} = gen_tcp:listen(?PORT, ?TCP_OPTIONS),
	register(accept_connection, spawn(fun() -> accept_connection(Listen) end)),
	register(group_manage, spawn(fun() -> group_manage([]) end)).

accept_connection(Listen) ->%%监听进程
    {ok, Socket} = gen_tcp:accept(Listen),
    Pid = spawn(fun() -> client_recv(Socket,"","") end),
    gen_tcp:controlling_process(Socket,Pid),
    accept_connection(Listen).

client_recv(Socket,ClientName,GroupPid)->%%client接受消息进程
	receive
		{tcp, Socket, Bin} ->
			?DEBUG("Server recv Data\r\n",""),
			case binary_to_term(Bin) of
				{join,JoinName,ClientNameTmp}->%%client join 加入组
					?DEBUG("client_recv Join:~s ~s\r\n",[JoinName,ClientNameTmp]),
					G=#group{name=JoinName},
					group_manage ! {join,G,self()},%%get group pid
						receive
							{grouppid,GPid}->%%recv group pid取得组的Pid
								?DEBUG("Client_recv grouppid ~w ~s\r\n",[GPid,ClientNameTmp]),
								GPid!{join,Socket,ClientNameTmp,self()} %% send "join" msg to group
						end,
					client_recv(Socket,ClientNameTmp,GPid);
				{send,GroupName,Msg}->%%群发消息
					?DEBUG("client_recv Send:~s ~s\r\n",[GroupName,Msg]),
					GroupPid!{send,ClientName,Msg},%%给组 消息
					client_recv(Socket,ClientName,GroupPid)
			end;
		{tcp_closed, Socket} ->
			?DEBUG("client_recv close\r\n",[])
	end.

group_manage(Group)->%%维护所有组和组内成员
	?DEBUG("group_manage start\r\n",[]),
	receive
		{join,JoinGroup,ClientPid}->
			?DEBUG("group_manage Join:~s\r\n",[JoinGroup#group.name]),
			case lists:keysearch(JoinGroup#group.name, #group.name, Group) of
				false ->
					Pid=spawn(fun() -> group([]) end),
					?DEBUG("group_manage group pid~w\r\n",[Pid]),
					ClientPid!{grouppid,Pid},%%return client_recv group pid
					JoinGroupp = #group{name=JoinGroup#group.name,gpid=Pid},
					Gs = [JoinGroupp|Group],
					?DEBUG("group_manage group:~w\r\n",[Gs]),
					group_manage(Gs);
				{value,SingleGroup} ->
					?DEBUG("group_manage group is set ~s\r\n",[SingleGroup#group.name]),
					ClientPid!{grouppid,SingleGroup#group.gpid},
					group_manage(Group)
			end

	end.

group(Client)->%%负责组的消息群发的进程
	?DEBUG("group \r\n",[]),
	receive
		{join,Socket,ClientName,CPid} ->
			?DEBUG("group ClientName ~s\r\n",[ClientName]),
			?DEBUG("group Client set ~w\r\n",[Client]),
			case lists:keysearch(ClientName,#client.name,Client) of
				false->
					?DEBUG("group new client\r\n",[]),
					ClientNew = #client{name=ClientName,pid=CPid,socket=Socket},
					ClientTmp = [ClientNew|Client],
					group(ClientTmp);
				{value,SingleClient}->
					?DEBUG("group client is set ~s\r\n",[SingleClient#client.name]),
					group(Client)
			end;
		{send,ClientName,Msg}->
			?DEBUG("group recv client send ~s\r\n",[Msg]),
			lists:foreach(fun(T)->send(ClientName,T,Msg) end,Client),
			group(Client)
	end,
	ok.
	
send(FClientName,TClientName,Msg)->
	if
		FClientName /= TClientName#client.name ->
			?DEBUG("send f:~s t:~s ~s\r\n",[FClientName,TClientName#client.name,Msg]),
			M = {FClientName,Msg},
			gen_tcp:send(TClientName#client.socket,term_to_binary(M));
		true->
			?DEBUG("send from eq to\r\n",[])
	end.
	


client端代码就是简单的迭代 gen_tcp:send
0 请登录后投票
   发表时间:2008-02-15  
你的send不是很高效,gen_tcp:send是会阻塞的,如果客户很慢,缓冲区已满就会阻塞,把TCP_OPTIONS加上{delay_send, true}测试一下有没有提高。
0 请登录后投票
   发表时间:2008-02-15  
谢谢楼上 我试试这个参数。
0 请登录后投票
   发表时间:2008-02-15  
加上这个参数也没明显变化,我觉得不是程序有逻辑错误,就是某些地方有问题,Aclient发100W的消息给SERVER特快,但是server给Bclient非常慢....估计1小时也发不完,慢的比其他脚本语言写的程序都慢了...即使Aclient和Bclient在同一机器上也一样,网络问题应该没有了。
0 请登录后投票
   发表时间:2008-02-15  
该不会是B客户根本没有接收吧?如果是在linux环境下,建议strace客户看看
0 请登录后投票
   发表时间:2008-02-15  
我开始也怀疑B没收到,但是BClient打印接受消息是有输出的,而且tcpdump查看也有SERVER的消息发过来。

erlang-gtalk上有人建议用netstat -nt看send和recv的值,我看server在发送时会突然出现send到2w左右,然后瞬间就0了,刷新多次偶尔会有几十Send,Bclient端多次刷偶尔会有recv几十的情况....奇怪。
我想是不是应该写个简单的 C->S->C的代码对比下是不是代码问题
0 请登录后投票
   发表时间:2008-02-15  
观察一下运行时的CPU占用情况吧,另外使用strace,判断是瓶颈出现在IO还是CPU上,出现在任意一个上面都可能是编写逻辑不当造成的。
0 请登录后投票
   发表时间:2008-02-16  
server运行时cpu占用99%,内存很少。
在erl的控制台用i()看进程。
                         Heapstack     Reds Msgs
<0.40.0>erlang:apply/2   832040  1194744 2616
        lists:foreach/2  442992  
是不是erlang的进程消息已经满了?同时不能处理这么多,我看都是经过foreach后在进行send。
0 请登录后投票
   发表时间:2008-02-16  
既然瓶颈在CPU上,还是使用fprof分析一下吧,测试可以减少到100-1000次

最容易出现的问题是receive匹配顺序,或者是没有匹配到,建议加一个任意匹配检查一下是否有没有匹配的消息
0 请登录后投票
论坛首页 综合技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics