`
wqtn22
  • 浏览: 99801 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

mnesia的普通transaction写过程(三)锁请求

 
阅读更多

上一篇博文介绍了mnesia的写操作函数write完成的工作,其中涉及了锁请求过程,此后将继续分析。


mnesia_locker.erl

wlock(Tid, Store, Oid) ->

    wlock(Tid, Store, Oid, _CheckMajority = true).

wlock(Tid, Store, Oid, CheckMajority) ->

    {Tab, Key} = Oid,

    case need_lock(Store, Tab, Key, write) of

yes ->

   {Ns, Majority} = w_nodes(Tab),

   if CheckMajority ->

   check_majority(Majority, Tab, Ns);

      true ->

   ignore

   end,

   Op = {self(), {write, Tid, Oid}},

   ?ets_insert(Store, {{locks, Tab, Key}, write}),

   get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid);

no when Key /= ?ALL, Tab /= ?GLOBAL ->

   [];

no ->

   element(2, w_nodes(Tab))

    end.

首先检查是否需要对要写的记录上锁。
need_lock(Store, Tab, Key, LockPattern) ->
    TabL = ?ets_match_object(Store, {{locks, Tab, ?ALL}, LockPattern}),
    if
TabL == [] ->
   KeyL = ?ets_match_object(Store, {{locks, Tab, Key}, LockPattern}),
   if
KeyL == [] -> yes;
true  -> no
   end;
true -> no
    end.
检查过程仅仅是检查临时ets表内是否对记录上了行锁或表锁。
w_nodes(Tab) ->
    case ?catch_val({Tab, where_to_wlock}) of
{[_ | _], _} = Where -> Where;
_ ->  mnesia:abort({no_exists, Tab})
    end.
mnesia使用ets表mnesia_gvars缓存了mnesia的全局变量和表的各项属性,相当于数据字典,此处从该表中取得数据表的where_to_wlock属性,以得到写该表时应该向哪些结点请求写锁。
该属性等同于表的where_to_write属性,在mnesia启动时进行表加载时、有结点加入该集群并创建表副本时、有结点退出该集群时等条件下进行修改,是一个动态值,等同于表的活动副本结点,包括其在线的所有ram_copies、disc_copies、disc_only_copies结点,可以通过mnesia:schema(TableName)找到表的活动副本结点。
由于mnesia表的磁盘及网络加载是一个比mnesia事务更复杂的过程,此处就不介绍其加载过程。此处仅需记得,事务发起进程需要向该表的所有在线副本结点请求锁即可。
check_majority(true, Tab, HaveNs) ->
    check_majority(Tab, HaveNs);
check_majority(false, _, _) ->
    ok.
check_majority(Tab, HaveNs) ->
    case ?catch_val({Tab, majority}) of
true ->
   case mnesia_lib:have_majority(Tab, HaveNs) of
true -> ok;
false -> mnesia:abort({no_majority, Tab})
   end;
_ -> ok
    end.
mnesia_lib.erl
have_majority(Tab, HaveNodes) ->
    have_majority(Tab, val({Tab, all_nodes}), HaveNodes).
have_majority(_Tab, AllNodes, HaveNodes) ->
    Missing = AllNodes -- HaveNodes,
    Present = AllNodes -- Missing,
    length(Present) > length(Missing).
在表的majority属性为true时,需要检查表的在线结点(此处来自于表的where_to_wlock属性)是否占其其全部结点(来自于表的all_nodes属性)的大多数,默认情况下不需要执行此项检查。
wlock(Tid, Store, Oid, CheckMajority) ->
    {Tab, Key} = Oid,
    case need_lock(Store, Tab, Key, write) of
yes ->
   {Ns, Majority} = w_nodes(Tab),
   if CheckMajority ->
   check_majority(Majority, Tab, Ns);
      true ->
   ignore
   end,
   Op = {self(), {write, Tid, Oid}},
   ?ets_insert(Store, {{locks, Tab, Key}, write}),
   get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid);
no when Key /= ?ALL, Tab /= ?GLOBAL ->
   [];
no ->
   element(2, w_nodes(Tab))
    end.
wlock函数的后半部分将在临时ets表中记录需要锁定的记录,正好与前面need_lock的检查对应起来
get_wlocks_on_nodes([Node | Tail], Orig, Store, Request, Oid) ->
    {?MODULE, Node} ! Request,
    ?ets_insert(Store, {nodes, Node}),
    receive_wlocks([Node], undefined, Store, Oid),
    case node() of
Node -> %% Local done try one more
   get_wlocks_on_nodes(Tail, Orig, Store, Request, Oid);
_ ->    %% The first succeded cont with the rest
   get_wlocks_on_nodes(Tail, Store, Request),
   receive_wlocks(Tail, Orig, Store, Oid)
    end;
get_wlocks_on_nodes([], Orig, _Store, _Request, _Oid) ->
    Orig.
get_wlocks_on_nodes([Node | Tail], Store, Request) ->
    {?MODULE, Node} ! Request,
    ?ets_insert(Store,{nodes, Node}),
    get_wlocks_on_nodes(Tail, Store, Request);
get_wlocks_on_nodes([], _, _) ->
    ok.
get_wlocks_on_nodes函数是锁请求的发起函数,主要过程是,向表的所有写锁请求结点的锁管理器发出写锁请求,若所有锁管理器均同意锁请求,则继续执行事务,否则中止事务。
在每个写锁请求发出后,还会向临时ets表内插入一条{nodes,Node}记录,这条记录是之后事务提交的依据。
receive_wlocks([], Res, _Store, _Oid) ->
    del_debug(),
    Res;
receive_wlocks(Nodes = [This|Ns], Res, Store, Oid) ->
    add_debug(Nodes),
    receive
{?MODULE, Node, granted} ->
   receive_wlocks(lists:delete(Node,Nodes), Res, Store, Oid);
{?MODULE, Node, {granted, Val}} -> %% for rwlocks
   case opt_lookup_in_client(Val, Oid, write) of
C = #cyclic{} ->
   flush_remaining(Nodes, Node, {aborted, C});
Val2 ->
   receive_wlocks(lists:delete(Node,Nodes), Val2, Store, Oid)
   end;
...
    end.
接收锁请求结果的过程,只有所有的锁管理器同意此次锁请求才能执行事务。
mnesia的所管理器mnesia_locker处理来自各处的锁请求。
mnesia_locker.erl
loop(State) ->
    receive
{From, {write, Tid, Oid}} ->
   try_sticky_lock(Tid, write, From, Oid),
   loop(State);
    ...
事务发起进程的锁请求将发射到锁管理器主循环处,并由其处理。
try_sticky_lock(Tid, Op, Pid, {Tab, _} = Oid) ->
    case ?ets_lookup(mnesia_sticky_locks, Tab) of
[] ->
   try_lock(Tid, Op, Pid, Oid);
[{_,N}] when N == node() ->
   try_lock(Tid, Op, Pid, Oid);
[{_,N}] ->
   Req = {Pid, {Op, Tid, Oid}},
   Pid ! {?MODULE, node(), {switch, N, Req}}
    end.
try_lock(Tid, read_write, Pid, Oid) ->
    try_lock(Tid, read_write, read, write, Pid, Oid);
try_lock(Tid, Op, Pid, Oid) ->
    try_lock(Tid, Op, Op, Op, Pid, Oid).
try_lock(Tid, Op, SimpleOp, Lock, Pid, Oid) ->
    case can_lock(Tid, Lock, Oid, {no, bad_luck}) of
{yes, Default} ->
   Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default),
   reply(Pid, Reply);
{{no, Lucky},_} ->
   C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky},
   ?dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]),
   reply(Pid, {not_granted, C});
{{queue, Lucky},_} ->
   ?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]),
   %% Append to queue: Nice place for trace output
   ?ets_insert(mnesia_lock_queue,
#queue{oid = Oid, tid = Tid, op = Op,
      pid = Pid, lucky = Lucky}),
   ?ets_insert(mnesia_tid_locks, {Tid, Oid, {queued, Op}})
    end.
锁请求过程是个较为复杂的过程。
首先检查粘着锁,此场景不涉及粘着锁,然后检查是否允许上锁,若允许则返回granted或{granted, R},否则返回{not_granted, C},mnesia锁管理器还支持锁排队,在初次上锁不成功时,允许延迟请求者一会后,等到上一个事务完成后再次请求。
can_lock(Tid, write, Oid = {Tab, Key}, AlreadyQ) when Key /= ?ALL ->
    ObjLocks = ?ets_lookup(mnesia_held_locks, Oid),
    TabLocks = ?ets_lookup(mnesia_held_locks, {Tab, ?ALL}),
    {check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, write), ObjLocks};
检查是否允许上锁,此场景中,我们请求一个行写锁,并且之前没有任何锁,需要检查表在之前是否已经有该行的锁或者表锁。
check_lock(Tid, Oid = {Tab, Key}, [], [], X, AlreadyQ, Type) ->
    if
Type == write ->
   check_queue(Tid, Tab, X, AlreadyQ);
Key == ?ALL ->
   %% hmm should be solvable by a clever select expr but not today...
   check_queue(Tid, Tab, X, AlreadyQ);
        ...
    end;
还必须检查此次上锁是否影响排队中的锁,这个检查是为了防止死锁,X参数为yes
check_queue(Tid, Tab, X, AlreadyQ) ->
    TabLocks = ets:lookup(mnesia_lock_queue, {Tab,?ALL}),
    Greatest = max(TabLocks),
    case Greatest of
empty ->  X;
Tid ->    X;
WaitForTid ->
   case allowed_to_be_queued(WaitForTid,Tid) of
true ->
   {queue, WaitForTid};
false when AlreadyQ =:= {no, bad_luck} ->
   {no, WaitForTid}
   end
    end.
allowed_to_be_queued(WaitForTid, Tid) ->
    case get(pid_sort_order) of
undefined -> WaitForTid > Tid;
r9b_plain ->
   cmp_tid(true, WaitForTid, Tid) =:= 1;
standard  ->
   cmp_tid(false, WaitForTid, Tid) =:= 1
    end.
若表没有表锁,或其等待者为当前锁请求事务,则返回请求需要的结果X;若表有表锁,且经过检查允许锁排队,则允许当前请求事务排队,否则拒绝锁请求,最简单的检查方式为检查等待中的事务其pid大于当前请求事务。
try_lock(Tid, Op, SimpleOp, Lock, Pid, Oid) ->
    case can_lock(Tid, Lock, Oid, {no, bad_luck}) of
{yes, Default} ->
   Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default),
   reply(Pid, Reply);
{{no, Lucky},_} ->
   C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky},
   ?dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]),
   reply(Pid, {not_granted, C});
{{queue, Lucky},_} ->
   ?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]),
   %% Append to queue: Nice place for trace output
   ?ets_insert(mnesia_lock_queue,
#queue{oid = Oid, tid = Tid, op = Op,
      pid = Pid, lucky = Lucky}),
   ?ets_insert(mnesia_tid_locks, {Tid, Oid, {queued, Op}})
    end.
在请求到锁后,锁管理器在try_lock的后半部分返回锁请求结果。
grant_lock(Tid, write, Lock, Oid, Default) ->
    set_lock(Tid, Oid, Lock, Default),
    granted.
set_lock(Tid, Oid, Op, []) ->
    ?ets_insert(mnesia_tid_locks, {Tid, Oid, Op}),
    ?ets_insert(mnesia_held_locks, {Oid, Op, [{Op, Tid}]});
该场景写锁请求成功后,将会向两张锁表中记录锁,并向请求者返回granted
锁管理器一共有四张锁表:
mnesia_held_locks:表锁或行锁
mnesia_tid_locks:一个事务涉及的所有锁
mnesia_sticky_locks:粘着锁
mnesia_lock_queue:排队锁请求
锁请求过程需要向表的where_to_wlock属性涉及的结点,也即表的在线副本结点发出锁请求,这些结点经过一系列的检查(如粘着锁,表锁,行锁,锁排队信息等)后,向事务发起进程返回锁请求结果,事务请求结点会在临时ets表中记录锁请求结果,以及参与锁请求的结点,以在将来事务提交的时候使用。
未完待续...
分享到:
评论

相关推荐

    Mnesia User's Guide

    session, specify a Mnesia database directory, initialize a database schema, start Mnesia, and create tables. Initial prototyping of record definitions is also discussed. • Build a Mnesia Database ...

    Mnesia table fragmentation 过程及算法分析

    Mnesia table fragmentation 过程及算法分析。erlang就算在64位下dets的空间限制仍旧是2g,同样影响了mnesia,如果有更大需求,就必须使用Mnesia的 table fragmentation 技术

    Amnesia-开源

    失忆症是一种提醒,允许您定义警报,贴纸(贴子)以提醒您一些重要的内容以及有关所需内容的注释。 可以将警报编程为在给定时间显示,可以在桌面上放置贴纸以随时查看。

    amnesia:失忆备忘录

    虽然市面上类似的功能很多,我之前也一直使用日事清这款软件,但是使用这样的软件太重了,就相当于我要去三千米之外的地方你让我坐飞机去,虽然也能达到,但是... 最开始它的目的就是记录当天的要完成的事项,但是...

    Api-Social-Amnesia.zip

    Api-Social-Amnesia.zip,忘记过去。社交健忘症确保你的社交媒体帐户只显示你最近的历史,而不是5年前“那个阶段”的帖子。,一个api可以被认为是多个软件设备之间通信的指导手册。例如,api可用于web应用程序之间的...

    Chrome Amnesia-crx插件

    语言:English (United States) 遗忘的延伸 Chrome失忆症是一个Chrome扩展程序,可让您有选择地不记得自己的任何浏览历史记录。...有关更多信息,请访问https://github.com/DanielBok/chrome-amnesia。

    Mnesia用户手册

    Mnesia用户手册Mnesia用户手册

    Mnesia用户手册.pdf

    Mnesia用户手册.pdf

    erlang——Mnesia用户手册.pdf

    8.附录.A:Mnesia.错误信息 8.1.Mnesia.中的错误 9.附录.B:备份回调函数接口 9.1.Mnesia.备份回调行为 10.附录.C:作业存取回调接口 10.1.Mnnesia.存取回调行为 11.附录.D:分片表哈希回调接口 11.1....

    Amnesia

    Amnesia

    Mnesia用户手册.zip

    Mnesia是一个分布式数据库管理系统(DBMS),适合于电信和其它需要持续运行和具备软实时 特性的Erlang应用。

    mnesia数据库文档

    erlang系统自带的数据库mnesia的官方文档。

    Mnesia用户手册(docx版)

    Mnesia用户手册(docx版) 详细讲解Mnesia数据库操作

    Mnesia用户手册(PDF版本)

    Mnesia用户手册(PDF版本) 详细讲述Mnesia数据库操作。

    amnesia-开源

    AMNESIA是一个Erlang库,提供了用于连接关系DBMS的抽象层。 它允许设计人员使用本机Erlang类型和语言构造将关系数据库集成到Erlang程序中。

    Erlang Mnesia

    Mnesia is a distributed Database Management System, appropriate for telecommunications applications and other Erlang applications which require continuous operation and soft real-time properties. It ...

    Mnesia用户手册 4.4.10版.rar

    8 附录 A : Mnesia 错误信息 . . .. . . 75 8.1 Mnesia 中的错误 . . . . .. . 75 9 附录 B :备份回调函数接口 . . .. . .. . . .. . 76 9.1 Mnesia 备份回调行为 . . .. . . . .. . 76 10 附录 C :作业存取...

Global site tag (gtag.js) - Google Analytics