首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > 其他数据库 >

mnesia的普普通通transaction写过程(五)事务提交

2012-07-04 
mnesia的普通transaction写过程(五)事务提交上一篇博文介绍了mnesia的事务提交准备过程,为每个事务参与结

mnesia的普通transaction写过程(五)事务提交

上一篇博文介绍了mnesia的事务提交准备过程,为每个事务参与结点构造了其提交结构commit,下面将进入到提交过程中,此后将继续分析。

?

mnesia_tm.erl

?

multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->

? ? {DiscNs, RamNs} = commit_nodes(CR, [], []),

? ? Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),

? ? ?ets_insert(Store, Pending),

?

? ? {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),

? ? {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),

? ? ?eval_debug_fun({?MODULE, multi_commit_sym},

? ?[{tid, Tid}, {outcome, Outcome}]),

? ? rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),

? ? rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),

? ? case Outcome of

do_commit ->

? ?mnesia_recover:note_decision(Tid, committed),

? ?do_dirty(Tid, Local),

? ?mnesia_locker:release_tid(Tid),

? ??MODULE ! {delete_transaction, Tid};

{do_abort, _Reason} ->

? ?mnesia_recover:note_decision(Tid, aborted)

? ? end,

? ? ?eval_debug_fun({?MODULE, multi_commit_sym, post},

? ?[{tid, Tid}, {outcome, Outcome}]),

Outcome;

commit_nodes([C | Tail], AccD, AccR)

? ? ? ? when C#commit.disc_copies == [],

? ? ? ? ? ? ?C#commit.disc_only_copies ?== [],

? ? ? ? ? ? ?C#commit.schema_ops == [] ->

? ? commit_nodes(Tail, AccD, [C#commit.node | AccR]);

commit_nodes([C | Tail], AccD, AccR) ->

? ? commit_nodes(Tail, [C#commit.node | AccD], AccR);

commit_nodes([], AccD, AccR) ->

? ? {AccD, AccR}.

取出所有参与事务的结点,这实质上等同于where_to_commit属性记录的所有结点。

ask_commit(Protocol, Tid, CR, DiscNs, RamNs) ->? ? ask_commit(Protocol, Tid, CR, DiscNs, RamNs, [], no_local).ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->? ? Node = Head#commit.node,? ? ifNode == node() -> ? ?ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);true -> ? ?Bin = opt_term_to_binary(Protocol, Head, DiscNs++RamNs), ? ?Msg = {ask_commit, Protocol, Tid, Bin, DiscNs, RamNs}, ? ?{?MODULE, Node} ! {self(), Msg}, ? ?ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)? ? end;ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->? ? {WaitFor, Local}.第一阶段提交,这次是同步过程,由ask_commit和rec_all组成,向每个结点的事务管理器通知事务属性,包括事务tid和事务协议等,及该结点的commit结构,此处使用的事务协议是sym_trans,为异步同构协议,异步表示该事务一旦在所有参与结点完成提交,则立即完成,不必等待每个结点都将事务日志落盘;同构表示所有事务参与结点的表结构都相同。同时观察各个结点的事务管理器的执行过程:doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->{From, {ask_commit, Protocol, Tid, Commit, DiscNs, RamNs}} -> ? ??eval_debug_fun({?MODULE, doit_ask_commit}, ? ?[{tid, Tid}, {prot, Protocol}]), ? ?mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), ? ?Pid =case Protocol of ? ?asym_trans when node(Tid#tid.pid) /= node() ->Args = [tmpid(From), Tid, Commit, DiscNs, RamNs],spawn_link(?MODULE, commit_participant, Args); ? ?_ when node(Tid#tid.pid) /= node() -> %% *_sym_transreply(From, {vote_yes, Tid}),nopidend, ? ?P = #participant{tid = Tid, ? ? pid = Pid, ? ? commit = Commit, ? ? disc_nodes = DiscNs, ? ? ram_nodes = RamNs, ? ? protocol = Protocol}, ? ?State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)}, ? ?doit_loop(State2);每个结点的事务管理器接收第一阶段的提交,记录提交的内容,并表示参与此次提交。事务发起进程等待各个结点的事务管理器的第一阶段提交结果:rec_all([Node | Tail], Tid, Res, Pids) ->? ? receive{?MODULE, Node, {vote_yes, Tid}} -> ? ?rec_all(Tail, Tid, Res, Pids);{?MODULE, Node, {vote_yes, Tid, Pid}} -> ? ?rec_all(Tail, Tid, Res, [Pid | Pids]);{?MODULE, Node, {vote_no, Tid, Reason}} -> ? ?rec_all(Tail, Tid, {do_abort, Reason}, Pids);{?MODULE, Node, {committed, Tid}} -> ? ?rec_all(Tail, Tid, Res, Pids);{?MODULE, Node, {aborted, Tid}} -> ? ?rec_all(Tail, Tid, Res, Pids);{mnesia_down, Node} -> ? ?Abort = {do_abort, {bad_commit, Node}}, ? ?catch {?MODULE, Node} ! {Tid, Abort}, ? ?rec_all(Tail, Tid, Abort, Pids)? ? end;rec_all([], _Tid, Res, Pids) ->? ? {Res, Pids}.发起事务的结点等待事务提交决议,待所有结点都返回结果后,向上层通知决议结果。第一阶段提交完成后,multi_commit将进行第二阶段提交:multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->? ? {DiscNs, RamNs} = commit_nodes(CR, [], []),? ? Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),? ? ?ets_insert(Store, Pending),
? ? {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),? ? {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),? ? ?eval_debug_fun({?MODULE, multi_commit_sym}, ? ?[{tid, Tid}, {outcome, Outcome}]),? ? rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),? ? rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),? ? case Outcome ofdo_commit -> ? ?mnesia_recover:note_decision(Tid, committed), ? ?do_dirty(Tid, Local), ? ?mnesia_locker:release_tid(Tid), ? ??MODULE ! {delete_transaction, Tid};{do_abort, _Reason} -> ? ?mnesia_recover:note_decision(Tid, aborted)? ? end,? ? ?eval_debug_fun({?MODULE, multi_commit_sym, post}, ? ?[{tid, Tid}, {outcome, Outcome}]),Outcome;第二阶段提交,这次是异步过程,通知每个结点的事务管理器写入日志和数据。各个结点的事务管理器继续参与第二阶段提交:doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->receive{Tid, do_commit} -> ? ?case gb_trees:lookup(Tid, Participants) ofnone -> ? ?verbose("Tried to commit a non participant transaction ~p~n",[Tid]), ? ?doit_loop(State);{value, P} -> ? ??eval_debug_fun({?MODULE,do_commit,pre},[{tid,Tid},{participant,P}]), ? ?case P#participant.pid ofnopid -> ? ?Commit = P#participant.commit, ? ?Member = lists:member(node(), P#participant.disc_nodes), ? ?if Member == false -> ? ?ignore; ? ? ? P#participant.protocol == sym_trans -> ? ?mnesia_log:log(Commit); ? ? ? P#participant.protocol == sync_sym_trans -> ? ?mnesia_log:slog(Commit) ? ?end, ? ?mnesia_recover:note_decision(Tid, committed), ? ?do_commit(Tid, Commit), ? ?ifP#participant.protocol == sync_sym_trans -> ? ?Tid#tid.pid ! {?MODULE, node(), {committed, Tid}};true -> ? ?ignore ? ?end, ? ?mnesia_locker:release_tid(Tid), ? ?transaction_terminated(Tid), ? ??eval_debug_fun({?MODULE,do_commit,post},[{tid,Tid},{pid,nopid}]), ? ?doit_loop(State#state{participants=gb_trees:delete(Tid,Participants)});Pid when is_pid(Pid) -> ? ?Pid ! {Tid, committed}, ? ??eval_debug_fun({?MODULE, do_commit, post}, [{tid, Tid}, {pid, Pid}]), ? ?doit_loop(State) ? ?end ? ?end;参与结点的事务管理器首先取回事务第一阶段提交的commit结构,然后进行日志记录和写入数据,并结束事务,后续过程由do_commit完成。对于事务发起结点,其过程由do_dirty完成:do_dirty(Tid, Commit) when Commit#commit.schema_ops == [] ->? ? mnesia_log:log(Commit),? ? do_commit(Tid, Commit).事务发起结点的过程也是如此,记录日志,然后由do_commit完成后续过程,mnesia的日志由disk_log实现。do_commit(Tid, Bin) when is_binary(Bin) ->? ? do_commit(Tid, binary_to_term(Bin));do_commit(Tid, C) ->? ? do_commit(Tid, C, optional).do_commit(Tid, Bin, DumperMode) when is_binary(Bin) ->? ? do_commit(Tid, binary_to_term(Bin), DumperMode);do_commit(Tid, C, DumperMode) ->? ? mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode),? ? R ?= do_snmp(Tid, C#commit.snmp),? ? R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R),? ? R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2),? ? R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3),? ? mnesia_subscr:report_activity(Tid),R4.do_commit实际完成后续的表操作,将数据真正写入表中。do_update(Tid, Storage, [Op | Ops], OldRes) ->? ? case catch do_update_op(Tid, Storage, Op) ofok -> ? ?do_update(Tid, Storage, Ops, OldRes);{'EXIT', Reason} -> ? ?verbose("do_update in ~w failed: ~p -> {'EXIT', ~p}~n", ? ?[Tid, Op, Reason]), ? ?do_update(Tid, Storage, Ops, OldRes);NewRes -> ? ?do_update(Tid, Storage, Ops, NewRes)? ? end;do_update(_Tid, _Storage, [], Res) ->Res.do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) ->? ? commit_write(?catch_val({Tab, commit_work}), Tid,?Tab, K, Obj, undefined),? ? mnesia_lib:db_put(Storage, Tab, Obj);mnesia_lib:db_put是真正完成数据表的ets表(非临时表)操作的地方。commit_write([], _, _, _, _, _) -> ok;commit_write([{checkpoints, CpList}|R], Tid, Tab, K, Obj, Old) ->? ? mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),? ? commit_write(R, Tid, Tab, K, Obj, Old);commit_write([H|R], Tid, Tab, K, Obj, Old)? when element(1, H) == subscribers ->? ? mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),? ? commit_write(R, Tid, Tab, K, Obj, Old);commit_write([H|R], Tid, Tab, K, Obj, Old)? when element(1, H) == index ->? ? mnesia_index:add_index(H, Tab, K, Obj, Old),commit_write(R, Tid, Tab, K, Obj, Old).commit_write的操作主要用于辅助工作,如检查点,写事件通报,索引添加等mnesia_lib.erldb_put(Tab, Val) ->? ? db_put(val({Tab, storage_type}), Tab, Val).db_put(ram_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;db_put(disc_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;db_put(disc_only_copies, Tab, Val) -> dets:insert(Tab, Val).将数据真正写入ets表。重新回到multi_commit中,此后将清除事务。multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->? ? {DiscNs, RamNs} = commit_nodes(CR, [], []),? ? Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),? ? ?ets_insert(Store, Pending),
? ? {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),? ? {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),? ? ?eval_debug_fun({?MODULE, multi_commit_sym}, ? ?[{tid, Tid}, {outcome, Outcome}]),? ? rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),? ? rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),? ? case Outcome ofdo_commit -> ? ?mnesia_recover:note_decision(Tid, committed), ? ?do_dirty(Tid, Local), ? ?mnesia_locker:release_tid(Tid), ? ??MODULE ! {delete_transaction, Tid};{do_abort, _Reason} -> ? ?mnesia_recover:note_decision(Tid, aborted)? ? end,? ? ?eval_debug_fun({?MODULE, multi_commit_sym, post}, ? ?[{tid, Tid}, {outcome, Outcome}]),? ? Outcome;事务发起结点通知本地事务管理器清除事务:doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->receive{delete_transaction, Tid} -> ? ?case gb_trees:is_defined(Tid, Participants) offalse -> ? ?case gb_trees:lookup(Tid, Coordinators) ofnone -> ? ?verbose("** ERROR ** Tried to delete a non transaction ~p~n", [Tid]), ? ?doit_loop(State);{value, Etabs} -> ? ?clear_fixtable(Etabs), ? ?erase_ets_tabs(Etabs), ? ?transaction_terminated(Tid), ? ?doit_loop(State#state{coordinators =?gb_trees:delete(Tid,Coordinators)}) ? ?end;true -> ? ?transaction_terminated(Tid), ? ?State2 = State#state{participants=gb_trees:delete(Tid,Participants)}, ? ?doit_loop(State2) ? ?end;与事务参与结点不同,事务发起结点清除的是coordinator,而事务参与结点清除的是participants域。之后的清除过程由transaction_terminated完成,该函数将做一些检查点相关的工作并递增事务序列号,以供下次事务使用。至此,mnesia的普通事务写就完成了,总结一下,transaction上下文中,一次mnesia:write写行记录至少涉及:一次行锁,一次同步提交,一次异步提交,一次临时ets表写,一次正式写,一次异步日志。

?

热点排行