`
wqtn22
  • 浏览: 99804 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

mnesia的普通transaction写过程(五)事务提交

 
阅读更多

上一篇博文介绍了mnesia的事务提交准备过程,为每个事务参与结点构造了其提交结构commit,下面将进入到提交过程中,此后将继续分析。

 

mnesia_tm.erl

 

multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->

    {DiscNs, RamNs} = commit_nodes(CR, [], []),

    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),

    ?ets_insert(Store, Pending),

 

    {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),

    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),

    ?eval_debug_fun({?MODULE, multi_commit_sym},

   [{tid, Tid}, {outcome, Outcome}]),

    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),

    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),

    case Outcome of

do_commit ->

   mnesia_recover:note_decision(Tid, committed),

   do_dirty(Tid, Local),

   mnesia_locker:release_tid(Tid),

   ?MODULE ! {delete_transaction, Tid};

{do_abort, _Reason} ->

   mnesia_recover:note_decision(Tid, aborted)

    end,

    ?eval_debug_fun({?MODULE, multi_commit_sym, post},

   [{tid, Tid}, {outcome, Outcome}]),

Outcome;

commit_nodes([C | Tail], AccD, AccR)

        when C#commit.disc_copies == [],

             C#commit.disc_only_copies  == [],

             C#commit.schema_ops == [] ->

    commit_nodes(Tail, AccD, [C#commit.node | AccR]);

commit_nodes([C | Tail], AccD, AccR) ->

    commit_nodes(Tail, [C#commit.node | AccD], AccR);

commit_nodes([], AccD, AccR) ->

    {AccD, AccR}.

取出所有参与事务的结点,这实质上等同于where_to_commit属性记录的所有结点。

ask_commit(Protocol, Tid, CR, DiscNs, RamNs) ->
    ask_commit(Protocol, Tid, CR, DiscNs, RamNs, [], no_local).
ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
    Node = Head#commit.node,
    if
Node == node() ->
   ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
true ->
   Bin = opt_term_to_binary(Protocol, Head, DiscNs++RamNs),
   Msg = {ask_commit, Protocol, Tid, Bin, DiscNs, RamNs},
   {?MODULE, Node} ! {self(), Msg},
   ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
    end;
ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
    {WaitFor, Local}.
第一阶段提交,这次是同步过程,由ask_commit和rec_all组成,向每个结点的事务管理器通知事务属性,包括事务tid和事务协议等,及该结点的commit结构,此处使用的事务协议是sym_trans,为异步同构协议,异步表示该事务一旦在所有参与结点完成提交,则立即完成,不必等待每个结点都将事务日志落盘;同构表示所有事务参与结点的表结构都相同。
同时观察各个结点的事务管理器的执行过程:
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
{From, {ask_commit, Protocol, Tid, Commit, DiscNs, RamNs}} ->
   ?eval_debug_fun({?MODULE, doit_ask_commit},
   [{tid, Tid}, {prot, Protocol}]),
   mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
   Pid =
case Protocol of
   asym_trans when node(Tid#tid.pid) /= node() ->
Args = [tmpid(From), Tid, Commit, DiscNs, RamNs],
spawn_link(?MODULE, commit_participant, Args);
   _ when node(Tid#tid.pid) /= node() -> %% *_sym_trans
reply(From, {vote_yes, Tid}),
nopid
end,
   P = #participant{tid = Tid,
    pid = Pid,
    commit = Commit,
    disc_nodes = DiscNs,
    ram_nodes = RamNs,
    protocol = Protocol},
   State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)},
   doit_loop(State2);
每个结点的事务管理器接收第一阶段的提交,记录提交的内容,并表示参与此次提交。
事务发起进程等待各个结点的事务管理器的第一阶段提交结果:
rec_all([Node | Tail], Tid, Res, Pids) ->
    receive
{?MODULE, Node, {vote_yes, Tid}} ->
   rec_all(Tail, Tid, Res, Pids);
{?MODULE, Node, {vote_yes, Tid, Pid}} ->
   rec_all(Tail, Tid, Res, [Pid | Pids]);
{?MODULE, Node, {vote_no, Tid, Reason}} ->
   rec_all(Tail, Tid, {do_abort, Reason}, Pids);
{?MODULE, Node, {committed, Tid}} ->
   rec_all(Tail, Tid, Res, Pids);
{?MODULE, Node, {aborted, Tid}} ->
   rec_all(Tail, Tid, Res, Pids);
{mnesia_down, Node} ->
   Abort = {do_abort, {bad_commit, Node}},
   catch {?MODULE, Node} ! {Tid, Abort},
   rec_all(Tail, Tid, Abort, Pids)
    end;
rec_all([], _Tid, Res, Pids) ->
    {Res, Pids}.
发起事务的结点等待事务提交决议,待所有结点都返回结果后,向上层通知决议结果。
第一阶段提交完成后,multi_commit将进行第二阶段提交:
multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->
    {DiscNs, RamNs} = commit_nodes(CR, [], []),
    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
    ?ets_insert(Store, Pending),

    {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),
    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
    ?eval_debug_fun({?MODULE, multi_commit_sym},
   [{tid, Tid}, {outcome, Outcome}]),
    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
    case Outcome of
do_commit ->
   mnesia_recover:note_decision(Tid, committed),
   do_dirty(Tid, Local),
   mnesia_locker:release_tid(Tid),
   ?MODULE ! {delete_transaction, Tid};
{do_abort, _Reason} ->
   mnesia_recover:note_decision(Tid, aborted)
    end,
    ?eval_debug_fun({?MODULE, multi_commit_sym, post},
   [{tid, Tid}, {outcome, Outcome}]),
Outcome;
第二阶段提交,这次是异步过程,通知每个结点的事务管理器写入日志和数据。
各个结点的事务管理器继续参与第二阶段提交:
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
receive
{Tid, do_commit} ->
   case gb_trees:lookup(Tid, Participants) of
none ->
   verbose("Tried to commit a non participant transaction ~p~n",[Tid]),
   doit_loop(State);
{value, P} ->
   ?eval_debug_fun({?MODULE,do_commit,pre},[{tid,Tid},{participant,P}]),
   case P#participant.pid of
nopid ->
   Commit = P#participant.commit,
   Member = lists:member(node(), P#participant.disc_nodes),
   if Member == false ->
   ignore;
      P#participant.protocol == sym_trans ->
   mnesia_log:log(Commit);
      P#participant.protocol == sync_sym_trans ->
   mnesia_log:slog(Commit)
   end,
   mnesia_recover:note_decision(Tid, committed),
   do_commit(Tid, Commit),
   if
P#participant.protocol == sync_sym_trans ->
   Tid#tid.pid ! {?MODULE, node(), {committed, Tid}};
true ->
   ignore
   end,
   mnesia_locker:release_tid(Tid),
   transaction_terminated(Tid),
   ?eval_debug_fun({?MODULE,do_commit,post},[{tid,Tid},{pid,nopid}]),
   doit_loop(State#state{participants=gb_trees:delete(Tid,Participants)});
Pid when is_pid(Pid) ->
   Pid ! {Tid, committed},
   ?eval_debug_fun({?MODULE, do_commit, post}, [{tid, Tid}, {pid, Pid}]),
   doit_loop(State)
   end
   end;
参与结点的事务管理器首先取回事务第一阶段提交的commit结构,然后进行日志记录和写入数据,并结束事务,后续过程由do_commit完成。
对于事务发起结点,其过程由do_dirty完成:
do_dirty(Tid, Commit) when Commit#commit.schema_ops == [] ->
    mnesia_log:log(Commit),
    do_commit(Tid, Commit).
事务发起结点的过程也是如此,记录日志,然后由do_commit完成后续过程,mnesia的日志由disk_log实现。
do_commit(Tid, Bin) when is_binary(Bin) ->
    do_commit(Tid, binary_to_term(Bin));
do_commit(Tid, C) ->
    do_commit(Tid, C, optional).
do_commit(Tid, Bin, DumperMode) when is_binary(Bin) ->
    do_commit(Tid, binary_to_term(Bin), DumperMode);
do_commit(Tid, C, DumperMode) ->
    mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode),
    R  = do_snmp(Tid, C#commit.snmp),
    R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R),
    R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2),
    R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3),
    mnesia_subscr:report_activity(Tid),
R4.
do_commit实际完成后续的表操作,将数据真正写入表中。
do_update(Tid, Storage, [Op | Ops], OldRes) ->
    case catch do_update_op(Tid, Storage, Op) of
ok ->
   do_update(Tid, Storage, Ops, OldRes);
{'EXIT', Reason} ->
   verbose("do_update in ~w failed: ~p -> {'EXIT', ~p}~n",
   [Tid, Op, Reason]),
   do_update(Tid, Storage, Ops, OldRes);
NewRes ->
   do_update(Tid, Storage, Ops, NewRes)
    end;
do_update(_Tid, _Storage, [], Res) ->
Res.
do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) ->
    commit_write(?catch_val({Tab, commit_work}), Tid, Tab, K, Obj, undefined),
    mnesia_lib:db_put(Storage, Tab, Obj);
mnesia_lib:db_put是真正完成数据表的ets表(非临时表)操作的地方。
commit_write([], _, _, _, _, _) -> ok;
commit_write([{checkpoints, CpList}|R], Tid, Tab, K, Obj, Old) ->
    mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
    commit_write(R, Tid, Tab, K, Obj, Old);
commit_write([H|R], Tid, Tab, K, Obj, Old)
  when element(1, H) == subscribers ->
    mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
    commit_write(R, Tid, Tab, K, Obj, Old);
commit_write([H|R], Tid, Tab, K, Obj, Old)
  when element(1, H) == index ->
    mnesia_index:add_index(H, Tab, K, Obj, Old),
commit_write(R, Tid, Tab, K, Obj, Old).
commit_write的操作主要用于辅助工作,如检查点,写事件通报,索引添加等
mnesia_lib.erl
db_put(Tab, Val) ->
    db_put(val({Tab, storage_type}), Tab, Val).
db_put(ram_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;
db_put(disc_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;
db_put(disc_only_copies, Tab, Val) -> dets:insert(Tab, Val).
将数据真正写入ets表。
重新回到multi_commit中,此后将清除事务。
multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->
    {DiscNs, RamNs} = commit_nodes(CR, [], []),
    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
    ?ets_insert(Store, Pending),

    {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),
    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
    ?eval_debug_fun({?MODULE, multi_commit_sym},
   [{tid, Tid}, {outcome, Outcome}]),
    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
    case Outcome of
do_commit ->
   mnesia_recover:note_decision(Tid, committed),
   do_dirty(Tid, Local),
   mnesia_locker:release_tid(Tid),
   ?MODULE ! {delete_transaction, Tid};
{do_abort, _Reason} ->
   mnesia_recover:note_decision(Tid, aborted)
    end,
    ?eval_debug_fun({?MODULE, multi_commit_sym, post},
   [{tid, Tid}, {outcome, Outcome}]),
    Outcome;
事务发起结点通知本地事务管理器清除事务:
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
receive
{delete_transaction, Tid} ->
   case gb_trees:is_defined(Tid, Participants) of
false ->
   case gb_trees:lookup(Tid, Coordinators) of
none ->
   verbose("** ERROR ** Tried to delete a non transaction ~p~n", [Tid]),
   doit_loop(State);
{value, Etabs} ->
   clear_fixtable(Etabs),
   erase_ets_tabs(Etabs),
   transaction_terminated(Tid),
   doit_loop(State#state{coordinators = gb_trees:delete(Tid,Coordinators)})
   end;
true ->
   transaction_terminated(Tid),
   State2 = State#state{participants=gb_trees:delete(Tid,Participants)},
   doit_loop(State2)
   end;
与事务参与结点不同,事务发起结点清除的是coordinator,而事务参与结点清除的是participants域。
之后的清除过程由transaction_terminated完成,该函数将做一些检查点相关的工作并递增事务序列号,以供下次事务使用。
至此,mnesia的普通事务写就完成了,总结一下,transaction上下文中,一次mnesia:write写行记录至少涉及:
一次行锁,一次同步提交,一次异步提交,一次临时ets表写,一次正式写,一次异步日志。

 

分享到:
评论

相关推荐

    Mnesia User's Guide

    session, specify a Mnesia database directory, initialize a database schema, start Mnesia, and create tables. Initial prototyping of record definitions is also discussed. • Build a Mnesia Database ...

    Mnesia table fragmentation 过程及算法分析

    Mnesia table fragmentation 过程及算法分析。erlang就算在64位下dets的空间限制仍旧是2g,同样影响了mnesia,如果有更大需求,就必须使用Mnesia的 table fragmentation 技术

    amnesia:失忆备忘录

    B站视频地址: 做了文字校验,已经成功上线,有兴趣的小伙伴可以扫码体验:可以微信搜索:失忆备忘录一、失忆的由来之所以开发这款软件,是因为在那段时间事情很多,但是经常忘记。虽然市面上类似的功能很多,我之前...

    Api-Social-Amnesia.zip

    Api-Social-Amnesia.zip,忘记过去。社交健忘症确保你的社交媒体帐户只显示你最近的历史,而不是5年前“那个阶段”的帖子。,一个api可以被认为是多个软件设备之间通信的指导手册。例如,api可用于web应用程序之间的...

    Chrome Amnesia-crx插件

    语言:English (United States) 遗忘的延伸 Chrome失忆症是一个Chrome扩展程序,可让您有选择地不记得自己的任何浏览历史记录。...有关更多信息,请访问https://github.com/DanielBok/chrome-amnesia。

    erlang——Mnesia用户手册.pdf

    8.附录.A:Mnesia.错误信息 8.1.Mnesia.中的错误 9.附录.B:备份回调函数接口 9.1.Mnesia.备份回调行为 10.附录.C:作业存取回调接口 10.1.Mnnesia.存取回调行为 11.附录.D:分片表哈希回调接口 11.1....

    Mnesia用户手册

    Mnesia用户手册Mnesia用户手册

    Amnesia-开源

    失忆症是一种提醒,允许您定义警报,贴纸(贴子)以提醒您一些重要的内容以及有关所需内容的注释。 可以将警报编程为在给定时间显示,可以在桌面上放置贴纸以随时查看。

    Mnesia用户手册.pdf

    Mnesia用户手册.pdf

    Amnesia

    Amnesia

    Mnesia用户手册.zip

    Mnesia是一个分布式数据库管理系统(DBMS),适合于电信和其它需要持续运行和具备软实时 特性的Erlang应用。

    mnesia数据库文档

    erlang系统自带的数据库mnesia的官方文档。

    Mnesia用户手册(docx版)

    Mnesia用户手册(docx版) 详细讲解Mnesia数据库操作

    Mnesia用户手册(PDF版本)

    Mnesia用户手册(PDF版本) 详细讲述Mnesia数据库操作。

    Mnesia用户手册 4.4.10版.rar

    8 附录 A : Mnesia 错误信息 . . .. . . 75 8.1 Mnesia 中的错误 . . . . .. . 75 9 附录 B :备份回调函数接口 . . .. . .. . . .. . 76 9.1 Mnesia 备份回调行为 . . .. . . . .. . 76 10 附录 C :作业存取...

    amnesia-开源

    AMNESIA是一个Erlang库,提供了用于连接关系DBMS的抽象层。 它允许设计人员使用本机Erlang类型和语言构造将关系数据库集成到Erlang程序中。

    Erlang Mnesia

    Mnesia is a distributed Database Management System, appropriate for telecommunications applications and other Erlang applications which require continuous operation and soft real-time properties. It ...

Global site tag (gtag.js) - Google Analytics