mnesia的普通transaction写过程(四)事务提交准备
上一篇博文介绍了mnesia的锁请求过程,在请求到锁后,mnesia:write将更新写入临时ets表,此后mnesia:write将完成其使命,重新回到mnesia_tm:apply_fun函数中,此后将继续分析。
apply_fun(Fun, Args, Type) ->
? ? Result = apply(Fun, Args),
? ? case t_commit(Type) of
do_commit ->
? ? ? ? ? ? {atomic, Result};
? ? ? ? do_commit_nested ->
? ? ? ? ? ? {nested_atomic, Result};
? ? ? ? {do_abort, {aborted, Reason}} ->
? ? ? ? ? ? {'EXIT', {aborted, Reason}};
? ? ? ? {do_abort, Reason} ->
? ? ? ? ? ? {'EXIT', {aborted, Reason}}
? ? end.
Type参数为async
t_commit(Type) ->
? ? {_Mod, Tid, Ts} = get(mnesia_activity_state),
? ? Store = Ts#tidstore.store,
? ? if
Ts#tidstore.level == 1 ->
? ?intercept_friends(Tid, Ts),
? ?%% N is number of updates
? ?case arrange(Tid, Store, Type) of
{N, Prep} when N > 0 ->
? ?multi_commit(Prep#prep.protocol,
majority_attr(Prep),
Tid, Prep#prep.records, Store);
{0, Prep} ->
? ?multi_commit(read_only,
majority_attr(Prep),
Tid, Prep#prep.records, Store)
? ?end;
...
? ? end.
在进行事务提交前,需要为每个事务参与结点计算其更新内容。
arrange(Tid, Store, Type) ->
? ? Nodes = get_elements(nodes,Store),
? ? Recs = prep_recs(Nodes, []),
? ? Key = ?ets_first(Store),
? ? N = 0,
? ? Prep =
case Type of
? ?async -> #prep{protocol = sym_trans, records = Recs};
? ?sync -> #prep{protocol = sync_sym_trans, records = Recs}
end,
? ? case catch do_arrange(Tid, Store, Key, Prep, N) of
{'EXIT', Reason} ->
? ?dbg_out("do_arrange failed ~p ~p~n", [Reason, Tid]),
? ?case Reason of
{aborted, R} ->
? ?mnesia:abort(R);
_ ->
? ?mnesia:abort(Reason)
? ?end;
{New, Prepared} ->
? ?{New, Prepared#prep{records = reverse(Prepared#prep.records)}}
? ? end.
临时ets表的nodes就是之前请求锁的目的结点,它们都是事务参与结点,此时事务提交类型为异步,其事务提交协议为sym_trans,及异步同构事务。
prep_recs([N | Nodes], Recs) ->
? ? prep_recs(Nodes, [#commit{decision = presume_commit, node = N} | Recs]);
prep_recs([], Recs) ->
Recs.
从临时ets表中取得所有的表操作记录,根据这些表操作,为每个事务参与结点构建一个事务提交结构commit并进行填充。
end.
构建事务提交的准备结构,将同一个表的操作合并在一起。
do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs) ->
? ? Recs2 = prepare_snmp(Tid, Tab, Key, Types, Snmp, Items, Recs), % May exit
prepare_nodes(Tid, Types, Items, Recs2, normal).
prepare_nodes(Tid, [{Node, Storage} | Rest], Items, C, Kind) ->
? ? {Rec, C2} = pick_node(Tid, Node, C, []),
? ? Rec2 = prepare_node(Node, Storage, Items, Rec, Kind),
? ? [Rec2 | prepare_nodes(Tid, Rest, Items, C2, Kind)];
prepare_nodes(_Tid, [], _Items, CommitRecords, _Kind) ->
? ? CommitRecords.
prepare_nodes根据表的where_to_commit属性进行处理,该表属性为一个{Node,StorageType}列表。
pick_node(Tid, Node, [Rec | Rest], Done) ->
? ? if
Rec#commit.node == Node ->
? ?{Rec, Done ++ Rest};
true ->
? ?pick_node(Tid, Node, Rest, [Rec | Done])
? ? end;
pick_node({dirty,_}, Node, [], Done) ->
? ? {#commit{decision = presume_commit, node = Node}, Done};
pick_node(_Tid, Node, [], _Done) ->
? ? mnesia:abort({bad_commit, {missing_lock, Node}}).
pick_node为一个事务参与结点整理出表的操作,形成{NodeCommit, Ops}元组
prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind == snmp ->
? ? Rec2 = Rec#commit{snmp = [Item | Rec#commit.snmp]},
? ? prepare_node(Node, Storage, Items, Rec2, Kind);
prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema ->
? ? Rec2 =
case Storage of
? ?ram_copies ->
Rec#commit{ram_copies = [Item | Rec#commit.ram_copies]};
? ?disc_copies ->
Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]};
? ?disc_only_copies ->
Rec#commit{disc_only_copies =?[Item | Rec#commit.disc_only_copies]}
end,
? ? prepare_node(Node, Storage, Items, Rec2, Kind);
prepare_node(_Node, _Storage, Items, Rec, Kind)
? when Kind == schema, Rec#commit.schema_ops == [] ?->
? ? Rec#commit{schema_ops = Items};
prepare_node(_Node, _Storage, [], Rec, _Kind) ->
Rec.
prepare_node根据事务参与结点的存储类型,进一步将表操作填充到事务参与结点的commit结构的具体存储类型中
t_commit(Type) ->? ? {_Mod, Tid, Ts} = get(mnesia_activity_state),? ? Store = Ts#tidstore.store,? ? ifTs#tidstore.level == 1 -> ? ?intercept_friends(Tid, Ts), ? ?case arrange(Tid, Store, Type) of{N, Prep} when N > 0 -> ? ?multi_commit(Prep#prep.protocol,?majority_attr(Prep),?Tid, Prep#prep.records, Store);{0, Prep} -> ? ?multi_commit(read_only,?majority_attr(Prep),?Tid, Prep#prep.records, Store) ? ?end;? ? ? ? ...? ? end.不考虑嵌套事务的情况,此时,已经为每个事务参与结点构建了一个commit结构,该结构根据事务参与结点的存储类型,在ram_copies、disc_copies、disc_only_copies之一中记录了所有的表操作记录。事务提交准备过程需要根据此前的更新情况,为每个事务参与结点构造好commit结构,该结构记录了在结点的具体副本类型下需要进行的更新,在下一步的提交过程中,将向各个事务参与结点传递该结构,以完成提交。未完待续...