mnesia的普通transaction写过程(三)锁请求
上一篇博文介绍了mnesia的写操作函数write完成的工作,其中涉及了锁请求过程,此后将继续分析。
mnesia_locker.erl
wlock(Tid, Store, Oid) ->
? ? wlock(Tid, Store, Oid, _CheckMajority = true).
wlock(Tid, Store, Oid, CheckMajority) ->
? ? {Tab, Key} = Oid,
? ? case need_lock(Store, Tab, Key, write) of
yes ->
? ?{Ns, Majority} = w_nodes(Tab),
? ?if CheckMajority ->
? ?check_majority(Majority, Tab, Ns);
? ? ? true ->
? ?ignore
? ?end,
? ?Op = {self(), {write, Tid, Oid}},
? ??ets_insert(Store, {{locks, Tab, Key}, write}),
? ?get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid);
no when Key /= ?ALL, Tab /= ?GLOBAL ->
? ?[];
no ->
? ?element(2, w_nodes(Tab))
? ? end.
首先检查是否需要对要写的记录上锁。need_lock(Store, Tab, Key, LockPattern) ->? ? TabL = ?ets_match_object(Store, {{locks, Tab, ?ALL}, LockPattern}),? ? ifTabL == [] -> ? ?KeyL = ?ets_match_object(Store, {{locks, Tab, Key}, LockPattern}), ? ?ifKeyL == [] ->?yes;true ?->?no ? ?end;true ->?no? ? end.检查过程仅仅是检查临时ets表内是否对记录上了行锁或表锁。w_nodes(Tab) ->? ? case ?catch_val({Tab, where_to_wlock}) of{[_ | _], _} = Where -> Where;_ -> ?mnesia:abort({no_exists, Tab})? ? end.mnesia使用ets表mnesia_gvars缓存了mnesia的全局变量和表的各项属性,相当于数据字典,此处从该表中取得数据表的where_to_wlock属性,以得到写该表时应该向哪些结点请求写锁。该属性等同于表的where_to_write属性,在mnesia启动时进行表加载时、有结点加入该集群并创建表副本时、有结点退出该集群时等条件下进行修改,是一个动态值,等同于表的活动副本结点,包括其在线的所有ram_copies、disc_copies、disc_only_copies结点,可以通过mnesia:schema(TableName)找到表的活动副本结点。由于mnesia表的磁盘及网络加载是一个比mnesia事务更复杂的过程,此处就不介绍其加载过程。此处仅需记得,事务发起进程需要向该表的所有在线副本结点请求锁即可。check_majority(true, Tab, HaveNs) ->? ? check_majority(Tab, HaveNs);check_majority(false, _, _) ->? ? ok.check_majority(Tab, HaveNs) ->? ? case ?catch_val({Tab, majority}) oftrue -> ? ?case mnesia_lib:have_majority(Tab, HaveNs) oftrue ->?ok;false ->?mnesia:abort({no_majority, Tab}) ? ?end;_ ->?ok? ? end.mnesia_lib.erlhave_majority(Tab, HaveNodes) ->? ? have_majority(Tab, val({Tab, all_nodes}), HaveNodes).have_majority(_Tab, AllNodes, HaveNodes) ->? ? Missing = AllNodes -- HaveNodes,? ? Present = AllNodes -- Missing,? ? length(Present) > length(Missing).在表的majority属性为true时,需要检查表的在线结点(此处来自于表的where_to_wlock属性)是否占其其全部结点(来自于表的all_nodes属性)的大多数,默认情况下不需要执行此项检查。wlock(Tid, Store, Oid, CheckMajority) ->? ? {Tab, Key} = Oid,? ? case need_lock(Store, Tab, Key, write) ofyes -> ? ?{Ns, Majority} = w_nodes(Tab), ? ?if CheckMajority -> ? ?check_majority(Majority, Tab, Ns); ? ? ? true -> ? ?ignore ? ?end, ? ?Op = {self(), {write, Tid, Oid}}, ? ??ets_insert(Store, {{locks, Tab, Key}, write}), ? ?get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid);no when Key /= ?ALL, Tab /= ?GLOBAL -> ? ?[];no -> ? ?element(2, w_nodes(Tab))? ? end.wlock函数的后半部分将在临时ets表中记录需要锁定的记录,正好与前面need_lock的检查对应起来get_wlocks_on_nodes([Node | Tail], Orig, Store, Request, Oid) ->? ? {?MODULE, Node} ! Request,? ? ?ets_insert(Store, {nodes, Node}),? ? receive_wlocks([Node], undefined, Store, Oid),? ? case node() ofNode -> %% Local done try one more ? ?get_wlocks_on_nodes(Tail, Orig, Store, Request, Oid);_ -> ? ?%% The first succeded cont with the rest ? ?get_wlocks_on_nodes(Tail, Store, Request), ? ?receive_wlocks(Tail, Orig, Store, Oid)? ? end;get_wlocks_on_nodes([], Orig, _Store, _Request, _Oid) ->? ? Orig.get_wlocks_on_nodes([Node | Tail], Store, Request) ->? ? {?MODULE, Node} ! Request,? ? ?ets_insert(Store,{nodes, Node}),? ? get_wlocks_on_nodes(Tail, Store, Request);get_wlocks_on_nodes([], _, _) ->? ? ok.get_wlocks_on_nodes函数是锁请求的发起函数,主要过程是,向表的所有写锁请求结点的锁管理器发出写锁请求,若所有所管理器均同意锁请求,则继续执行事务,否则中止事务。在每个写锁请求发出后,还会向临时ets表内插入一条{nodes,Node}记录,这条记录是之后事务提交的依据。receive_wlocks([], Res, _Store, _Oid) ->? ? del_debug(),? ? Res;receive_wlocks(Nodes = [This|Ns], Res, Store, Oid) ->? ? add_debug(Nodes),? ? receive{?MODULE, Node, granted} -> ? ?receive_wlocks(lists:delete(Node,Nodes), Res, Store, Oid);{?MODULE, Node, {granted, Val}} -> %% for rwlocks ? ?case opt_lookup_in_client(Val, Oid, write) ofC = #cyclic{} -> ? ?flush_remaining(Nodes, Node, {aborted, C});Val2 -> ? ?receive_wlocks(lists:delete(Node,Nodes), Val2, Store, Oid) ? ?end;...? ? end.接收锁请求结果的过程,只有所有的锁管理器同意此次锁请求才能执行事务。mnesia的所管理器mnesia_locker处理来自各处的锁请求。mnesia_locker.erlloop(State) ->? ? receive{From, {write, Tid, Oid}} -> ? ?try_sticky_lock(Tid, write, From, Oid), ? ?loop(State);? ? ...事务发起进程的锁请求将发射到锁管理器主循环处,并由其处理。try_sticky_lock(Tid, Op, Pid, {Tab, _} = Oid) ->? ? case ?ets_lookup(mnesia_sticky_locks, Tab) of[] -> ? ?try_lock(Tid, Op, Pid, Oid);[{_,N}] when N == node() -> ? ?try_lock(Tid, Op, Pid, Oid);[{_,N}] -> ? ?Req = {Pid, {Op, Tid, Oid}}, ? ?Pid ! {?MODULE, node(), {switch, N, Req}}? ? end.try_lock(Tid, read_write, Pid, Oid) ->? ? try_lock(Tid, read_write, read, write, Pid, Oid);try_lock(Tid, Op, Pid, Oid) ->? ? try_lock(Tid, Op, Op, Op, Pid, Oid).try_lock(Tid, Op, SimpleOp, Lock, Pid, Oid) ->? ? case can_lock(Tid, Lock, Oid, {no, bad_luck}) of{yes, Default} -> ? ?Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default), ? ?reply(Pid, Reply);{{no, Lucky},_} -> ? ?C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky}, ? ??dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), ? ?reply(Pid, {not_granted, C});{{queue, Lucky},_} -> ? ??dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), ? ?%% Append to queue: Nice place for trace output ? ??ets_insert(mnesia_lock_queue,#queue{oid = Oid, tid = Tid, op = Op, ? ? ? pid = Pid, lucky = Lucky}), ? ??ets_insert(mnesia_tid_locks, {Tid, Oid, {queued, Op}})? ? end.锁请求过程是个较为复杂的过程。首先检查粘着锁,此场景不涉及粘着锁,然后检查是否允许上锁,若允许则返回granted或{granted, R},否则返回{not_granted, C},mnesia锁管理器还支持锁排队,在初次上锁不成功时,允许延迟请求者一会后,等到上一个事务完成后再次请求。can_lock(Tid, write, Oid = {Tab, Key}, AlreadyQ) when Key /= ?ALL ->? ? ObjLocks = ?ets_lookup(mnesia_held_locks, Oid),? ? TabLocks = ?ets_lookup(mnesia_held_locks, {Tab, ?ALL}),? ? {check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, write), ObjLocks};检查是否允许上锁,此场景中,我们请求一个行写锁,并且之前没有任何锁,需要检查表在之前是否已经有该行的锁或者表锁。check_lock(Tid, Oid = {Tab, Key}, [], [], X, AlreadyQ, Type) ->? ? ifType == write -> ? ?check_queue(Tid, Tab, X, AlreadyQ);Key == ?ALL -> ? ?%% hmm should be solvable by a clever select expr but not today... ? ?check_queue(Tid, Tab, X, AlreadyQ);? ? ? ? ...? ? end;还必须检查此次上锁是否影响排队中的锁,这个检查是为了防止死锁,X参数为yescheck_queue(Tid, Tab, X, AlreadyQ) ->? ? TabLocks = ets:lookup(mnesia_lock_queue, {Tab,?ALL}),? ? Greatest = max(TabLocks),? ? case Greatest ofempty -> ?X;Tid -> ? ?X;WaitForTid -> ? ?case allowed_to_be_queued(WaitForTid,Tid) oftrue -> ? ?{queue, WaitForTid};false when AlreadyQ =:= {no, bad_luck} -> ? ?{no, WaitForTid} ? ?end? ? end.allowed_to_be_queued(WaitForTid, Tid) ->? ? case get(pid_sort_order) ofundefined -> WaitForTid > Tid;r9b_plain -> ? ?cmp_tid(true, WaitForTid, Tid) =:= 1;standard ?-> ? ?cmp_tid(false, WaitForTid, Tid) =:= 1? ? end.若表没有表锁,或其等待者为当前锁请求事务,则返回请求需要的结果X;若表有表锁,且经过检查允许锁排队,则允许当前请求事务排队,否则拒绝锁请求,最简单的检查方式为检查等待中的事务其pid大于当前请求事务。try_lock(Tid, Op, SimpleOp, Lock, Pid, Oid) ->? ? case can_lock(Tid, Lock, Oid, {no, bad_luck}) of{yes, Default} -> ? ?Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default), ? ?reply(Pid, Reply);{{no, Lucky},_} -> ? ?C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky}, ? ??dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), ? ?reply(Pid, {not_granted, C});{{queue, Lucky},_} -> ? ??dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), ? ?%% Append to queue: Nice place for trace output ? ??ets_insert(mnesia_lock_queue,#queue{oid = Oid, tid = Tid, op = Op, ? ? ? pid = Pid, lucky = Lucky}), ? ??ets_insert(mnesia_tid_locks, {Tid, Oid, {queued, Op}})? ? end.在请求到锁后,锁管理器在try_lock的后半部分返回锁请求结果。grant_lock(Tid, write, Lock, Oid, Default) ->? ? set_lock(Tid, Oid, Lock, Default),? ? granted.set_lock(Tid, Oid, Op, []) ->? ? ?ets_insert(mnesia_tid_locks, {Tid, Oid, Op}),? ? ?ets_insert(mnesia_held_locks, {Oid, Op, [{Op, Tid}]});该场景写锁请求成功后,将会向两张锁表中记录锁,并向请求者返回granted锁管理器一共有四张锁表:mnesia_held_locks:表锁或行锁mnesia_tid_locks:一个事务涉及的所有锁mnesia_sticky_locks:粘着锁mnesia_lock_queue:排队所请求锁请求过程需要向表的where_to_wlock属性涉及的结点,也即表的在线副本结点发出锁请求,这些结点经过一系列的检查(如粘着锁,表锁,行锁,锁排队信息等)后,向事务发起进程返回锁请求结果,事务请求结点会在临时ets表中记录锁请求结果,以及参与锁请求的结点,以在将来事务提交的时候使用。未完待续...