`
zdx3578
  • 浏览: 93062 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

rabbitmq erlang 源代码读 三 core process启动 gen_server

阅读更多

from http://blog.chinaunix.net/u3/103972/showart.php?id=2130456 这里格式较好
  我原来的博客 zdx3578.cublog.cn

看erlang自己的源代码 proc_lib.erl
proc_info(Pid,Item) when node(Pid) =:= node() ->
    process_info(Pid,Item);
proc_info(Pid,Item) ->
    case lists:member(node(Pid),nodes()) of
    true ->
        check(rpc:call(node(Pid), erlang, process_info, [Pid, Item]));
    _ ->
        hidden
    end.
erlang 把底层远程调用都隐藏了,这就是“显意编程”(intentional programming)吧 http://erlang-china.org/uploads/2007/09/joes-thesis.zip的4.5

gen_server.erl 源代码看看

gen_server.erl  %%从粗体看下去再从斜体看上来
start(Mod, Args, Options) ->
    gen:start(?MODULE, nolink, Mod, Args, Options).
start_link(Mod, Args, Options) ->
    gen:start(?MODULE, link, Mod, Args, Options).
     
gen.erl
start(GenMod, LinkP, Mod, Args, Options) ->
    do_spawn(GenMod, LinkP, Mod, Args, Options).
do_spawn(GenMod, _, Mod, Args, Options) ->
    Time = timeout(Options),
    proc_lib:start(?MODULE, init_it, [GenMod, self(), self, Mod, Args, Options], Time, spawn_opts(Options)). %%MFA
       
proc_lib.erl
-spec start(atom(), atom(), [term()], timeout(), [spawn_option()]) -> term().
start(M, F, A, Timeout, SpawnOpts) when is_atom(M), is_atom(F), is_list(A) ->
    Pid = ?MODULE:spawn_opt(M, F, A, SpawnOpts),
    sync_wait(Pid, Timeout).
spawn_opt(M, F, A, Opts) when is_atom(M), is_atom(F), is_list(A) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    check_for_monitor(Opts),
    erlang:spawn_opt(?MODULE, init_p, [Parent,Ancestors,M,F,A], Opts).
-spec init_p(pid(), [pid()], atom(), atom(), [term()]) -> term().
init_p(Parent, Ancestors, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
    put('$ancestors', [Parent|Ancestors]),
    put('$initial_call', trans_init(M, F, A)),
    init_p_do_apply(M, F, A).
init_p_do_apply(M, F, A) ->
    try
    apply(M, F, A)
    catch
    Class:Reason ->
        exit_p(Class, Reason)
    end.

gen.erl的如下语句
    proc_lib:start(?MODULE, init_it, [GenMod, self(), self, Mod, Args, Options], Time, spawn_opts(Options)). %%MFA 就执行gen.erl 的init_it
init_it(GenMod, Starter, Parent, Mod, Args, Options) ->%%FA
    init_it2(GenMod, Starter, Parent, self(), Mod, Args, Options).
  
init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
    GenMod:init_it(Starter, Parent, Name, Mod, Args, Options).%%A GenMod 就是gen_server,执行gen_server的init

gen_server.erl
init_it(Starter, self, Name, Mod, Args, Options) ->
    init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name0, Mod, Args, Options) ->
    Name = name(Name0),
    Debug = debug_options(Name, Options),
    case catch Mod:init(Args) of                    %%gen_server 在调用Mod模块的init
    {ok, State} ->
        proc_lib:init_ack(Starter, {ok, self()}),       
        loop(Parent, Name, State, Mod, infinity, Debug);%%开始循环
    {ok, State, Timeout} ->
        proc_lib:init_ack(Starter, {ok, self()}),       
        loop(Parent, Name, State, Mod, Timeout, Debug);
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    end.


%%% The MAIN loop.
%%% ---------------------------------------------------
loop(Parent, Name, State, Mod, hibernate, Debug) ->
    proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, Debug]);
loop(Parent, Name, State, Mod, Time, Debug) ->
    Msg = receive
          Input ->
            Input
      after Time ->
          timeout
      end,
    decode_msg(Msg, Parent, Name, State, Mod, Time, Debug, false).

decode_msg(Msg, Parent, Name, State, Mod, Time, Debug, Hib) ->
    case Msg of
    {system, From, Req} ->
        sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
                  [Name, State, Mod, Time], Hib);
    {'EXIT', Parent, Reason} ->
        terminate(Reason, Name, Msg, Mod, State, Debug);
    _Msg when Debug =:= [] ->
        handle_msg(Msg, Parent, Name, State, Mod);
    _Msg ->
        Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
                      Name, {in, Msg}),
        handle_msg(Msg, Parent, Name, State, Mod, Debug1)
    end.

%%% Message handling functions
%%% ---------------------------------------------------

dispatch({'$gen_cast', Msg}, Mod, State) ->
    Mod:handle_cast(Msg, State);
dispatch(Info, Mod, State) ->
    Mod:handle_info(Info, State).

handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod) ->
    case catch Mod:handle_call(Msg, From, State) of    %%回调handle_call
    {reply, Reply, NState} ->
        reply(From, Reply),
        loop(Parent, Name, NState, Mod, infinity, []);   %%继续循环
    {reply, Reply, NState, Time1} ->
        reply(From, Reply),
        loop(Parent, Name, NState, Mod, Time1, []);
    {noreply, NState} ->
        loop(Parent, Name, NState, Mod, infinity, []);
    {noreply, NState, Time1} ->
        loop(Parent, Name, NState, Mod, Time1, []);
    {stop, Reason, Reply, NState} ->
        {'EXIT', R} =
        (catch terminate(Reason, Name, Msg, Mod, NState, [])),
        reply(From, Reply),
        exit(R);
    Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State)
    end;
handle_msg(Msg, Parent, Name, State, Mod) ->
    Reply = (catch dispatch(Msg, Mod, State)),
    handle_common_reply(Reply, Parent, Name, Msg, Mod, State).









——————————————————————————————————————————————————————————————


rabbit.erl
start(normal, []) ->
    {ok, SupPid} = rabbit_sup:start_link(),

rabbit_sup.erl
-define(SERVER, ?MODULE).
start_link() ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).%%注册supervisor name {local,rabbit_sup}

init([]) ->
    {ok, {{one_for_one, 10, 10}, []}}.
  
rabbit.erl
       {"core processes",
        fun () ->
                ok = start_child(rabbit_log),
                ok = rabbit_hooks:start(),

                ok = rabbit_binary_generator:
                    check_empty_content_body_frame_size(),
                {ok, MemoryAlarms} = application:get_env(memory_alarms),
                ok = rabbit_alarm:start(MemoryAlarms),
                ok = rabbit_amqqueue:start(),
                ok = start_child(rabbit_router),
                ok = start_child(rabbit_node_monitor)
        end},


start_child(Mod) ->
    {ok,_} = supervisor:start_child(rabbit_sup,%%这个名的注册就是在start/2启动时的rabbit_sup:start_link() 启动的
                                    {Mod, {Mod, start_link, []},
                                     transient, 100, worker, [Mod]}),
    ok.


supervisor.erl:
-behaviour(gen_server).
start_child(Supervisor, ChildSpec) ->
    call(Supervisor, {start_child, ChildSpec}).
call(Supervisor, Req) ->
    gen_server:call(Supervisor, Req, infinity).
  
gen_server.erl
call(Name, Request, Timeout) ->
    case catch gen:call(Name, '$gen_call', Request, Timeout) of
    {ok,Res} ->
        Res;
    {'EXIT',Reason} ->
        exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
    end.
  
  
gen.erl
call(Pid, Label, Request, Timeout)
  when is_pid(Pid), Timeout =:= infinity;
       is_pid(Pid), is_integer(Timeout), Timeout >= 0 ->
    do_call(Pid, Label, Request, Timeout);  

do_call(Process, Label, Request, Timeout) ->
    Node = case Process of
            {_S, N} when is_atom(N) ->
            N;
            _ when is_pid(Process) ->
            node(Process)
       end,
    try erlang:monitor(process, Process) of
    Mref ->
        catch erlang:send(Process, {Label, {self(), Mref}, Request},
          [noconnect]),
        wait_resp_mon(Node, Mref, Timeout)
    catch
    error:_ ->
        monitor_node(Node, true),
        receive
        {nodedown, Node} ->
            monitor_node(Node, false),
            exit({nodedown, Node})
        after 0 ->
            Tag = make_ref(),
            Process ! {Label, {self(), Tag}, Request},
            wait_resp(Node, Tag, Timeout)
        end
    end.

wait_resp_mon(Node, Mref, Timeout) ->
..............

wait_resp(Node, Tag, Timeout) ->
..............

gen_server.erl
dispatch({'$gen_cast', Msg}, Mod, State) ->
    Mod:handle_cast(Msg, State);
dispatch(Info, Mod, State) ->
    Mod:handle_info(Info, State).

handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod) ->
    case catch Mod:handle_call(Msg, From, State) of
    {reply, Reply, NState} ->
        reply(From, Reply),
        loop(Parent, Name, NState, Mod, infinity, []);
    {reply, Reply, NState, Time1} ->
        reply(From, Reply),
        loop(Parent, Name, NState, Mod, Time1, []);
    {noreply, NState} ->
        loop(Parent, Name, NState, Mod, infinity, []);
    {noreply, NState, Time1} ->
        loop(Parent, Name, NState, Mod, Time1, []);
    {stop, Reason, Reply, NState} ->
        {'EXIT', R} =
        (catch terminate(Reason, Name, Msg, Mod, NState, [])),
        reply(From, Reply),
        exit(R);
    Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State)
    end;
handle_msg(Msg, Parent, Name, State, Mod) ->
    Reply = (catch dispatch(Msg, Mod, State)),
    handle_common_reply(Reply, Parent, Name, Msg, Mod, State).
  
handle_common_reply(Reply, Parent, Name, Msg, Mod, State) ->
    case Reply of
    {noreply, NState} ->
        loop(Parent, Name, NState, Mod, infinity, []);
    {noreply, NState, Time1} ->
        loop(Parent, Name, NState, Mod, Time1, []);
    {stop, Reason, NState} ->
        terminate(Reason, Name, Msg, Mod, NState, []);
    {'EXIT', What} ->
        terminate(What, Name, Msg, Mod, State, []);
    _ ->
        terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
    end.


  
  
supervisor.erl:
handle_call({start_child, ChildSpec}, _From, State) ->
    case check_childspec(ChildSpec) of
    {ok, Child} ->
        {Resp, NState} = handle_start_child(Child, State),
        {reply, Resp, NState};
    What ->
        {reply, {error, What}, State}
    end;

分享到:
评论
2 楼 zdx3578 2010-03-01  
不好意思,博客还在老地方  zdx3578.cublog.cn
1 楼 mryufeng 2010-03-01  
rabbitmq是个很好的code example

相关推荐

Global site tag (gtag.js) - Google Analytics