rabbitmq 网络层启动代码分析!--?xml version1.0 encodingUTF-8 standaloneno?--?networking m
rabbitmq 网络层启动代码分析
<!--?xml version="1.0" encoding="UTF-8" standalone="no"?-->
?
networking module:
rabbit_networking.erl:
start() ->
??? {ok,_} = supervisor2:start_child(
?????????????? rabbit_sup,
?????????????? {rabbit_tcp_client_sup,
??????????????? {rabbit_client_sup, start_link,
???????????????? [{local, rabbit_tcp_client_sup},
????????????????? {rabbit_connection_sup,start_link,[]}]},
??????????????? transient, infinity, supervisor, [rabbit_client_sup]}),
??? ok.
?
====>
rabbit_sup->rabbit_client_sup->rabbit_connection_sup:
start_link() ->
??? {ok, SupPid} = supervisor2:start_link(?MODULE, []),
??? {ok, Collector} =
??????? supervisor2:start_child(
????????? SupPid,
????????? {collector, {rabbit_queue_collector, start_link, []},
?????????? intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
??? {ok, ChannelSupSupPid} =
??????? supervisor2:start_child(
????????? SupPid,
????????? {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
?????????? intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
??? {ok, ReaderPid} =
??????? supervisor2:start_child(
????????? SupPid,
????????? {reader, {rabbit_reader, start_link,
??????????????????? [ChannelSupSupPid, Collector,
???????????????????? rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
?????????? intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
? ? {ok, SupPid, ReaderPid}.
rabbit_channel_sup_sup.erl:start_link() ->??? supervisor2:start_link(?MODULE, []).
init([]) ->??? {ok, {{simple_one_for_one_terminate, 0, 1},????????? [{channel_sup, {rabbit_channel_sup, start_link, []},??????????? temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
supervisor2.erl:%% SupName = self%% SupFlags = {simple_one_for_one_terminate, 0, 1}%% Mod = rabbit_channel_sup%% Args = []? ? ?case init_state(SupName, SupFlags, Mod, Args) of????????? {ok, State} when ?is_simple(State) -> ?%% 这里判断的结果确实是simple????????????? init_dynamic(State, StartSpec);????????? {ok, State} ->????????????? init_children(State, StartSpec);????????? Error ->????????????? {stop, {supervisor_data, Error}}? ? ? end;
==> ?{ok, State} =?{ok, #state{name = supname(SupName,Mod),??????????? strategy = Strategy,??????????? intensity = MaxIntensity,??????????? period = Period,??????????? module = Mod,??????????? args = Args}};
==>?到这里为止,目前rabbit_channel_sup_sup并没有把rabbit_channel_sup启动起来%% StartSpec:?{channel_sup, {rabbit_channel_sup, start_link, []},??????????? temporary, infinity, supervisor, [rabbit_channel_sup]}init_dynamic(State, [StartSpec]) ->??? case check_startspec([StartSpec]) of??????? {ok, Children} ->???????? {ok, State#state{children = Children}};??????? Error ->??????????? {stop, {start_spec, Error}}??? end;
==> (由rabbit_reader.erl启动channel):init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->??? Deb = sys:debug_options([]),??? receive ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??%% 这里由rabbit_networking.erl主动发起 Reader ! {go, Sock, SockTransform}?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? %% 以下boot_tcp成功后才通知Reader??????? {go, Sock, SockTransform} ->??????????? start_connection(????????????? Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,????????????? SockTransform)??? end.
==>start_connection:recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(?????????????????????????????????????? State, #v1.stats_timer),????????????????????????????????????? handshake, 8))
==>recvloop:recvloop(Deb, handle_input(State#v1.callback, Data,?????????????????????????????? State#v1{buf = [Rest],??????????????????????????????????????? buf_len = BufLen - RecvLen})).
==>handle_input:switch_callback(handle_frame(Type, Channel, Payload, State),??????????????????????????? frame_header, 7);
==>handle_frame:true? -> send_to_new_channel(?????????????????????????????????? Channel, AnalyzedFrame, State);
==>send_to_new_channel:rabbit_channel_sup_sup:start_channel(????????? ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,?????????????????????? VHost, Capabilities, Collector}),
由此,这里才开始start rabbit_channel_sup
所有(包括networking)都是rabbit.erl 的-rabbit_boot_step加载步骤加载起来
rabbit_networking.erl ?==> boot_tcp:start_tcp_listener -> start_listener -> start_listener0:{ok,_} = supervisor:start_child(?????????????? rabbit_sup,?????????????? {Name,??????????????? {tcp_listener_sup, start_link,???????????????? [IPAddress, Port, [Family | tcp_opts()],????????????????? {?MODULE, tcp_listener_started, [Protocol]},????????????????? {?MODULE, tcp_listener_stopped, [Protocol]},????????????????? OnConnect, Label]}, ? ? ? ? ?%% OnConnect=rabbit_networking:start_client??????????????? transient, infinity, supervisor, [tcp_listener_sup]}).[IPAddress, Port, [Family | tcp_opts()],=> tcp_listener_sup:start_link(Args). ?%% Args=? ? ? ? ? ? ? ?[IPAddress, Port, [Family | tcp_opts()],????????????????? {?MODULE, tcp_listener_started, [Protocol]},????????????????? {?MODULE, tcp_listener_stopped, [Protocol]},????????????????? OnConnect, Label]
{ok, {{one_for_all, 10, 10},????????? [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,?????????????????????????????? [Name, AcceptCallback]},??????????? transient, infinity, supervisor, [tcp_acceptor_sup]},?????????? {tcp_listener, {tcp_listener, start_link,?????????????????????????? [IPAddress, Port, SocketOpts,??????????????????????????? ConcurrentAcceptorCount, Name,??????????????????????????? OnStartup, OnShutdown, Label]},??????????? transient, 16#ffffffff, worker, [tcp_listener]}]}}.
=>tcp_acceptor_sup.erl:{ok, {{simple_one_for_one, 10, 10},????????? [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},??????????? transient, brutal_kill, worker, [tcp_acceptor]}]}}.
这里tcp_acceptor_sup start_link后并不会start tcp_acceptor。看代码可以知道"simple_one_for_one"调用init_dynamic,只是把child信息记录,不会调用start_link
下一个tcp_listener start的时候{ok, _APid} = supervisor:start_child(????????????????????????????????????????????????? AcceptorSup, [LSock]) ?%% AcceptorSup就是上面的tcp_acceptor_sup在这里再次调用sup的start_child方法,看supervisor可以知道start_child会把这次和上次保存的child参数合并:Child = hd(State#state.children),??? #child{mfargs = {M, F, A}} = Child,??? Args = A ++ EArgs,??? case do_start_child_i(M, F, Args) of故最终调用tcp_acceptor:start_link方法,解释了为什么tcp_acceptor的start_link两个参数的原因。
rabbit_channel_sup.erl: 通过Reader启动之后1,先对于Reader和Channel启动一个Writer(rabbit_writer.erl)?[{writer, {rabbit_writer, start_link,?????????????? [Sock, Channel, FrameMax, Protocol, ReaderPid]},
2, start channel:{ok, ChannelPid} =??????? supervisor2:start_child(????????? SupPid,????????? {channel, {rabbit_channel, start_link,???????????????????? [Channel, ReaderPid, WriterPid, ReaderPid, Protocol,????????????????????? User, VHost, Capabilities, Collector,????????????????????? rabbit_limiter:make_token(LimiterPid)]},?????????? intrinsic, ?MAX_WAIT, worker, [rabbit_channel]})