首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > 其他数据库 >

mnesia的普普通通transaction写过程(四)事务提交准备

2012-06-29 
mnesia的普通transaction写过程(四)事务提交准备上一篇博文介绍了mnesia的锁请求过程,在请求到锁后,mnesia

mnesia的普通transaction写过程(四)事务提交准备

上一篇博文介绍了mnesia的锁请求过程,在请求到锁后,mnesia:write将更新写入临时ets表,此后mnesia:write将完成其使命,重新回到mnesia_tm:apply_fun函数中,此后将继续分析。


apply_fun(Fun, Args, Type) ->

? ? Result = apply(Fun, Args),

? ? case t_commit(Type) of

do_commit ->

? ? ? ? ? ? {atomic, Result};

? ? ? ? do_commit_nested ->

? ? ? ? ? ? {nested_atomic, Result};

? ? ? ? {do_abort, {aborted, Reason}} ->

? ? ? ? ? ? {'EXIT', {aborted, Reason}};

? ? ? ? {do_abort, Reason} ->

? ? ? ? ? ? {'EXIT', {aborted, Reason}}

? ? end.

Type参数为async

t_commit(Type) ->

? ? {_Mod, Tid, Ts} = get(mnesia_activity_state),

? ? Store = Ts#tidstore.store,

? ? if

Ts#tidstore.level == 1 ->

? ?intercept_friends(Tid, Ts),

? ?%% N is number of updates

? ?case arrange(Tid, Store, Type) of

{N, Prep} when N > 0 ->

? ?multi_commit(Prep#prep.protocol,

majority_attr(Prep),

Tid, Prep#prep.records, Store);

{0, Prep} ->

? ?multi_commit(read_only,

majority_attr(Prep),

Tid, Prep#prep.records, Store)

? ?end;

...

? ? end.

在进行事务提交前,需要为每个事务参与结点计算其更新内容。

arrange(Tid, Store, Type) ->

? ? Nodes = get_elements(nodes,Store),

? ? Recs = prep_recs(Nodes, []),

? ? Key = ?ets_first(Store),

? ? N = 0,

? ? Prep =

case Type of

? ?async -> #prep{protocol = sym_trans, records = Recs};

? ?sync -> #prep{protocol = sync_sym_trans, records = Recs}

end,

? ? case catch do_arrange(Tid, Store, Key, Prep, N) of

{'EXIT', Reason} ->

? ?dbg_out("do_arrange failed ~p ~p~n", [Reason, Tid]),

? ?case Reason of

{aborted, R} ->

? ?mnesia:abort(R);

_ ->

? ?mnesia:abort(Reason)

? ?end;

{New, Prepared} ->

? ?{New, Prepared#prep{records = reverse(Prepared#prep.records)}}

? ? end.

临时ets表的nodes就是之前请求锁的目的结点,它们都是事务参与结点,此时事务提交类型为异步,其事务提交协议为sym_trans,及异步同构事务。

prep_recs([N | Nodes], Recs) ->

? ? prep_recs(Nodes, [#commit{decision = presume_commit, node = N} | Recs]);

prep_recs([], Recs) ->

Recs.

从临时ets表中取得所有的表操作记录,根据这些表操作,为每个事务参与结点构建一个事务提交结构commit并进行填充。


end.

构建事务提交的准备结构,将同一个表的操作合并在一起。

do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs) ->

? ? Recs2 = prepare_snmp(Tid, Tab, Key, Types, Snmp, Items, Recs), % May exit

prepare_nodes(Tid, Types, Items, Recs2, normal).

prepare_nodes(Tid, [{Node, Storage} | Rest], Items, C, Kind) ->

? ? {Rec, C2} = pick_node(Tid, Node, C, []),

? ? Rec2 = prepare_node(Node, Storage, Items, Rec, Kind),

? ? [Rec2 | prepare_nodes(Tid, Rest, Items, C2, Kind)];

prepare_nodes(_Tid, [], _Items, CommitRecords, _Kind) ->

? ? CommitRecords.

prepare_nodes根据表的where_to_commit属性进行处理,该表属性为一个{Node,StorageType}列表。

pick_node(Tid, Node, [Rec | Rest], Done) ->

? ? if

Rec#commit.node == Node ->

? ?{Rec, Done ++ Rest};

true ->

? ?pick_node(Tid, Node, Rest, [Rec | Done])

? ? end;

pick_node({dirty,_}, Node, [], Done) ->

? ? {#commit{decision = presume_commit, node = Node}, Done};

pick_node(_Tid, Node, [], _Done) ->

? ? mnesia:abort({bad_commit, {missing_lock, Node}}).

pick_node为一个事务参与结点整理出表的操作,形成{NodeCommit, Ops}元组

prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind == snmp ->

? ? Rec2 = Rec#commit{snmp = [Item | Rec#commit.snmp]},

? ? prepare_node(Node, Storage, Items, Rec2, Kind);

prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema ->

? ? Rec2 =

case Storage of

? ?ram_copies ->

Rec#commit{ram_copies = [Item | Rec#commit.ram_copies]};

? ?disc_copies ->

Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]};

? ?disc_only_copies ->

Rec#commit{disc_only_copies =?[Item | Rec#commit.disc_only_copies]}

end,

? ? prepare_node(Node, Storage, Items, Rec2, Kind);

prepare_node(_Node, _Storage, Items, Rec, Kind)

? when Kind == schema, Rec#commit.schema_ops == [] ?->

? ? Rec#commit{schema_ops = Items};

prepare_node(_Node, _Storage, [], Rec, _Kind) ->

Rec.

prepare_node根据事务参与结点的存储类型,进一步将表操作填充到事务参与结点的commit结构的具体存储类型中

t_commit(Type) ->? ? {_Mod, Tid, Ts} = get(mnesia_activity_state),? ? Store = Ts#tidstore.store,? ? ifTs#tidstore.level == 1 -> ? ?intercept_friends(Tid, Ts), ? ?case arrange(Tid, Store, Type) of{N, Prep} when N > 0 -> ? ?multi_commit(Prep#prep.protocol,?majority_attr(Prep),?Tid, Prep#prep.records, Store);{0, Prep} -> ? ?multi_commit(read_only,?majority_attr(Prep),?Tid, Prep#prep.records, Store) ? ?end;? ? ? ? ...? ? end.不考虑嵌套事务的情况,此时,已经为每个事务参与结点构建了一个commit结构,该结构根据事务参与结点的存储类型,在ram_copies、disc_copies、disc_only_copies之一中记录了所有的表操作记录。事务提交准备过程需要根据此前的更新情况,为每个事务参与结点构造好commit结构,该结构记录了在结点的具体副本类型下需要进行的更新,在下一步的提交过程中,将向各个事务参与结点传递该结构,以完成提交。未完待续...

热点排行