RabbitMQ源码分析 – 消息生命周期
(注:分析代码基于RabbitMQ 2.8.2)
match_routing_key(SrcName, [RoutingKey]) -> find_routes(#route{binding = #binding{source = SrcName, destination = '$1', key = RoutingKey, _ = '_'}}, []);match_routing_key(SrcName, [_|_] = RoutingKeys) -> find_routes(#route{binding = #binding{source = SrcName, destination = '$1', key = '$2', _ = '_'}}, [list_to_tuple(['orelse' | [{'=:=', '$2', RKey} || RKey <- RoutingKeys]])]).find_routes(MatchHead, Conditions) -> ets:select(rabbit_route, [{MatchHead, Conditions, ['$1']}]).
(参见[$RABBIT_SRC/src/rabbit_router.erl --> match_routing_key/2])
?
从node1到node2的路径,rabbit会首先尝试用“test”匹配,发现没到直达路径,然后尝试以“*”匹配,成功,node2到node3以“abc”匹配成功,node3到node4同理,node4到node5以“#”号匹配,然后在node5结点要跳过任何不能匹配node5到node6路径的单词,这里是“456”,最后匹配到“end”。
rabbit在这个算法的实现上有点奇怪,例如从node2到node3的匹配,即使“abc”路径已经匹配,但还是会尝试通过“*”和“#”匹配,增加了很多无意义的比较。
(代码参见[$RABBIT_SRC/src/rabbit_exchange_type_topic.erl --> trie_match/2])
?
headers匹配算法
AMQP协议对headers类型的exchange的匹配算法有如下规定:由“x-match”头来控制匹配模式,分两种,all和any,类似于布尔运算里的AND和OR:如果匹配模式是all,则目标消息中的头信息的值必须匹配所有在绑定时指定的头信息;如果为any,则只要有一个头信息的值匹配就可以。其中的匹配是指,要么绑定时指定的头信息值为空,要么目标消息中的头信息的值与绑定时指定的值完全一致。
rabbit在实现时,会对绑定时指定的头信息和目标消息中的头信息进行排序(以头信息的键升序排列),然后逐个比对。
想不通这里为什么进行字母序排序(代码里也称这里的匹配算法是Horrendous matching algorithm,具体参见[$RABBIT_SRC/src/rabbit_exchange_type_headers.erl --> headers_match/2]),基于hash map类的数据结构更高效一些。
?
查找队列进程
每一个队列在创建时,会在rabbit_queue数据表中写入#amqqueue,其中跟进程相关的两个域是pid和slave_pids(在HA策略下slave_pids才有效,代表各个slave结点上的镜像队列的进程ID),pid代表的进程由[$RABBIT_SRC/src/rabbit_amqqueue_process.erl]。
查找的过程其实很简单,就是从rabbit_queue数据表中,根据队列名称找到相应的#amqqueue记录,并将相应的pid和slave_pids全部返回。
?
投递消息
找到队列对应的处理进程后,通过代理的方式(见[$RABBIT_SRC/src/delegate.erl])向各个队列进程(包含镜像进程)发送deliver消息。rabbit_amqqueue_process在收到deliver消息后,会尝试将消息投递到某个消费者(参见[$RABBIT_SRC/src/rabbit_amqqueue_process --> attempt_delivery/3])。最终会通过[$RABBIT_SRC/src/rabbit_writer.erl --> send_command_and_notify/5]调用以basic.deliver命令(AMQP)将消息内容发送给消费者。如果投递失败,有两种可能的结果:一种直接丢弃,另一种会将消息保存在队列中,等待后续有新的消费者加入时重新投递。保存在队列中的消息会根据durable属性来判断是不是需要写入磁盘,一般情况下,此值为false,消息只保存在内存中,如果需要持久化,此值为true,消息会写入磁盘。
?
队列机制
跟这部分相关的涉及到rabbit_msg_store模块和rabbit_queue_index模块。而且backing queue为rabbit_mirror_queue_master的队列还涉及到GM(Guaranteed Multicast)相关的东西,所以打算专门写一篇分析这一块。
?