RabbitMQ 原理解析

2020/02/10 Knowledge

IaaS使用RabbitMQ来进行模块间的消息传递,但主要是一些运维工作,看过RabbitMQ的一些代码,并没有实际写过。本文将先从RabbitMQ原理入手,介绍此消息中间件,之后和同为消息中间件的Kafka做对比,分析两款消息中间件的差异。

目录

RabbitMQ 概念

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
  2. 灵活的路由(Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
  3. 消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
  4. 高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  5. 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT等等。
  6. 多语言客户端(Many Clients):RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
  7. 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
  8. 跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
  9. 插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。

基本架构

RabbitMQ架构

  • Messge:消息,由消息头和消息体组成。消息体是不透明的,消息头由一些列可选属性组成,这些属性包括:routing-key(路由键)、priority(优先级)、delivery-mode(消息是否可持久性存储)。
  • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端程序。
  • Exchange:交换器,用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。三种常见的交换器:direct(发布与订阅,完全匹配)、fanout(广播)、topic(主题,规则匹配)
  • Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器与消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列取走消息。
  • Routing-key:路由键,RabbitMQ决定消息该投递到哪个队列的规则。队列通过路由键绑定到交换器。消息发送到MQ服务器时,消息拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。如果相匹配,消息会投递到该队列,如果不匹配,消息将会进入黑洞。
  • Connection:链接,指rabbitMQ服务器和服务建立的TCP连接。
  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Virtual Host:虚拟主机,表示一批交换机,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器。每个virtualhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
  • Broker:表示消息队列服务器实体。

RabbitMQ

Exchange 类型

Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

direct

Direct:消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,交换器就将消息发到对应的队列中。它是完全匹配、单播的模式。

direct

fanout

Fanout:每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

fanout

topic 交换器(模式匹配)

topic 交换器:topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0 个或多个单词,匹配不多不少一个单词。

topic

常见问题

1.RabbitMQ使用信道而不是直接使用TCP通信的原因?

TCP的创建和销毁开销特别大,创建需要3次握手,销毁需要4次挥手。如果不用信道,那么应用程序就会以TCP连接到RabbitMQ,高峰时每秒成千上万条连接会造成资源的巨大浪费,而且操作系统每秒处理TCP连接数也是有限制的,必定造成性能瓶颈。

信道的原理是一条线程一条信道,多条线程多条信道同用一条TCP连接。一条TCP连接可以容纳无线信道即使每秒成千上万的请求也不会成为性能的瓶颈。

2.使用RabbitMQ有什么好处?

1.解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

2.异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

3.削峰,并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

3.如何确保消息正确地发送至RabbitMQ?

RabbitMQ使用发送方确认模式,确保消息正确地发送到RabbitMQ。发送方确认模式:将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

4.如何确保消息接收方消费了消息?

接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。

5.死信队列和延迟队列的使用?

DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。

死信消息:

  1. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
  2. 消息过期了,在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,第二种通过对消息本身进行设置
  3. 队列达到最大的长度

6.RabbitMQ 概念里的 channel、exchange 和 queue 是逻辑概念,还是对应着进程实体?分别起什么作用?

queue 具有自己的 erlang 进程;exchange 内部实现为保存 binding 关系的查找表;channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给 queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有 AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。

Kafka 和 RabbitMQ 对比

  1. 吞吐量 kafka吞吐量更高:
    1. Zero Copy机制,内核copy数据直接copy到网络设备,不必经过内核到用户再到内核的copy,减小了copy次数和上下文切换次数,大大提高了效率。
    2. 磁盘顺序读写,减少了寻道等待的时间。
    3. 批量处理机制,服务端批量存储,客户端主动批量pull数据,消息处理效率高。
    4. 存储具有O(1)的复杂度,读物因为分区和segment,是O(log(n))的复杂度。
    5. 分区机制,有助于提高吞吐量。
  2. 可靠性 rabbitmq可靠性更好:
    1. 确认机制(生产者和exchange,消费者和队列);
    2. 支持事务,但会造成阻塞;
    3. 委托(添加回调来处理发送失败的消息)和备份交换器(将发送失败的消息存下来后面再处理)机制;
  3. 高可用
    1. rabbitmq采用mirror queue,即主从模式,数据是异步同步的,当消息过来,主从全部写完后,回ack,这样保障了数据的一致性。
    2. Kafka 每个分区都可以有一个或多个副本,这些副本保存在不同的broker上,broker信息存储在zookeeper上,当broker不可用会重新选举leader。kafka支持同步负责消息和异步同步消息(有丢消息的可能),生产者从zk获取leader信息,发消息给leader,follower从leader pull数据然后回ack给leader。
  4. 负载均衡
    1. kafka通过zk和分区机制实现:zk记录broker信息,生产者可以获取到并通过策略完成负载均衡;通过分区,投递消息到不同分区,消费者通过服务组完成均衡消费。
    2. rabbitmq需要外部支持。
  5. 模型
    1. rabbitmq:
      1. producer,broker遵循AMQP(exchange,bind,queue),consumer;
      2. broker为中心,exchange分topic,direct,fanout和header,路由模式适合多种场景;
      3. consumer消费位置由broker通过确认机制保存;
    2. kafka:
      1. producer,broker,consumer,未遵循AMQP;
      2. consumer为中心,获取消息模式由consumer自己决定;
      3. offset保存在消费者这边,broker无状态;
      4. 消息是名义上的永久存储,每个parttition按segment保存自己的消息为文件(可配置清理周期);
      5. consumer可以通过重置offset消费历史消息;
      6. 需要绑定zk;

RocketMQ

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
  • 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
  • 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:

  • 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
  • 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
  • 支持18个级别的延迟消息(rabbitmq和kafka不支持)
  • 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
  • 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
  • 支持重复消费(rabbitmq不支持,kafka支持)

Search

    Table of Contents