rocketmq

RocketMQ简介#

**消息队列:**一句话解释消息队列就是提供了包含远程通讯、发送消息、存储消息、消费消息等功能的一个中间件。可以使得我们的发送方和接受方进行解耦、异步通讯。一般用在系统解耦、广播、流量销峰、分布式事务、异步消息传递等场景。

RocketMQ没有完全遵守JMS标准,JMS标准可能在吞吐或者设计上有一些不太好的要求,导致了性能不及预期。RocketMQ进行了修改。

核心概念:#

下面这块搬运总结自官方readme,只为快速查阅

主要由 Producer、Broker、Consumer 三部分组成:#

  • 其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。

其他组件:

  • Broker:

    • 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息每个Topic的消息也可以分片存储于不同的 Broker。
    • 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
  • 名称服务(Name Server)

  • 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。 (类似与FastDFS的Tracker,或者SpringCloud的注册中心)

  • Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

两种消费方式(push/pull):#

拉取式消费(Pull Consumer)#

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法**从Broker服务器拉消息、主动权由应用控制。**一旦获取了批量消息,应用就会启动消费过程。

推动式消费(Push Consumer)#

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

顺序:#

普通顺序消息(Normal Ordered Message)#

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息(Strictly Ordered Message)#

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

消息Msg特点:#

消息(Message)#

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

标签(Tag)#

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

组group的概念:#

生产者、消费者都有group的概念。

一个topic下面可以有多个组生产组、消费组,可以根据组进行业务隔离、消息过滤(MessageFilter)

出现问题也可以根据组名去排查

部署架构#

  • 有2m2s,两主两从的集群模式。配置文件有异步的2m-2s-async和同步的2m-2s-sync
  • 2m-noslave,2主没有从模式

一个经典的RocketMQ的网络部署图:

有一个NameServer集群用于处理Broker的IP寻址等命名服务

image-20200803130631460

  • 如client

  • remoting是远程服务,如netty

  • Producer和Cunsomer相对于Broker都是Client

image-20200803130341795

特性#

Producer的特性#

发送方式#

sync:#

同步发送,需要一个响应

async:#

异步发送,给一个callback,发送完毕会来回调。可以指定timeout。

oentway:#

发出去后不管,直接返回。如日志等不重要的数据。

发送结果SendStatus#

org.apache.rocketmq.client.producer.SendStatus

  • SEND-OK:消息发送成功
  • FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失.(消息在broker内存中)
  • FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入务器队列,只有此时服务器宕机,消息才会丢
  • SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机消息才会丢

消息过滤:

给同一个topic发送的时候可以带着tag,

消息类型#

普通消息#

定时消息#

发送到broker之后不能立刻消费,需要特定的时间之后才可以消费。如和第三方对接的回调延时。

原理和普通消息一样,只是多了一个定时参数。由服务器去决定。

顺序消息#

强一致性的先进先出,普通消息只能每个queue里面先进先出。

  • 底层原理:一个订单下面的不同状态消息发送到同一个queue里面,就能保证严格个先进先出。需要实现MessageQueueSelector

image-20200804163215333

事务消息#

经典的两阶段提交、或者三阶段提交:

image-20200804164014890

整个流程:

image-20200804164117815

官方说明:

http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/

img

官方描述的整个过程

  1. 生产者将一半消息(HalfMessage,其实是message里面设置了一个属性,transaction=true)发送到MQ服务器。
  2. 发送成功一半消息后,执行本地事务。
  3. 根据本地事务结果将提交或回滚消息发送到MQ Server。
  4. 如果在本地事务执行过程中缺少提交/回退消息或生产者处于等待状态,MQ服务器将向同一组中的每个生产者发送检查消息,以获取交易状态。
  5. 生产者根据本地事务状态回复提交/回退消息。
  6. 提交的消息将传递给使用者,但是回滚的消息将被MQ服务器丢弃。

screenshot

为了掩盖存储的基础实现,所有事务性消息操作都集中在事务服务接口上。 RocketMQ提供了自己的存储系统的默认实现,我们使用事务桥来实现事务存储逻辑,而不是直接修改RocketMQ的存储层。

要注意noslave模式是不支持事务消息的。

接收特性#

消息幂等性#

rocketMQ没有实现消息去重,需要业务自己实现。当消费方因为网络闪断没有及时返回ACK消息,可能会导致重复消费。

终极解决方案是给message的key用redis incr实现一个全局唯一标识。然后消费的时候搞一个redis set看看有没有消费过,没有再使用。

消息过滤:#

订阅的时候实现一个MesageSelector,可以bySQL,使用类SQL语句进行过滤如MesageSelector.bySQL("TAGS is not null and TAGS in('TAGA', 'TAGB')")

也可以subscribe的时候自己实现一个MessageFilter接口,有一个Match方法,可以根据msg信息进行过滤。

PUSH和PULL:#

PULL是由业务系统自己去决定何时进行去拉取消息。DefaultMQPullConsumer

PUSH方式消费,需要注册一个消息监听器。在start方法中启动了一个线程池去死循环在里面长轮询take拉取消息。DefaultMQPushConsumer

也就是底层核心都是使用pull实现的,调用broker。可以简化应用使用MQ的方式。

源码目录

image-20200803130440835

高性能#

一般来说,MQ作为一个中间件需要有一个持久化过程来保证高可用,以防万一宕机可以从持久化文件恢复数据。(纯内存型的ZeroMQ等除外)。

  • ActiveMQ可以使用JDBC持久化到数据库,也可以使用NFS进行持久化来保证高可用。
  • Redis和RockDB使用KV数据库的方式进行持久化
  • RocketMQ、RabbitMQ、Kafka使用文件系统持久化,磁盘坏了就挂了。

存储架构的高性能:CommitLog定长内存映射+ConsumeQueue对Topic的缓存页顺序读取#

image-20200804122939317

RocketMQ的消息存储架构如上图所示,可以看到主要由三个跟消息存储相关的文件构成。

  • CommitLog消息及元数据的存储主体。消息内容不是定长的,同时单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量。比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当第一个文件满了,再写入下一个文件。

  • -----存储消息

    1
    2
    3
    commitlog/
    ├── 00000000000000000000
    └── 00000000001073741824
  • ConsumeQueue:**消息消费队列。RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要根据topic在commitlog文件中进行检索消息,效率将会非常低效。**ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset、消息大小size和消息Tag的HashCode值。 **所以ConsumeQueue文件可以看成是基于topic的CommitLog索引文件。**ConsumeQueue文件夹的组织方式如下:topic/queue/file三层组织结构。而ConsumeQueue存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。与Commitlog一样,ConsumeQueue文件采取了定长设计,单个文件由30W个条目组成,每一个条目共20个字节,分别为8字节的CommitLog物理偏移量、4字节的消息长度、8字节tag hashcode,Comsumer可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

  • -----作为Topic的索引去CommieLog拿数据,里面是CommitLog的偏移量,消费的时候使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    consumequeue/
    └── TopicTest
    ├── 0
    │   └── 00000000000000000000
    ├── 1
    │   └── 00000000000000000000
    ├── 2
    │   └── 00000000000000000000
    └── 3
    └── 00000000000000000000
  • IndexFile:(索引文件)**提供了一种可以通过key或时间区间来查询消息的方法。**Index文件的存储位置是:HOME/store/indexHOME /store/index{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引。

    1
    2
    3
    index/
    └── 20190804112308158

image-20200804181344806

index文件里面存放的每一个节点如最上面所示,是定长的。有keyHash-CommitLog Offset-

页缓存与内存映射#

  • 页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于-OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。
    • 对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。
    • 对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
  • 在RocketMQ中,**ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,**即使在有消息堆积情况下也不会影响性能。
  • 而对于CommitLog消息存储的日志数据文件来说,读取消息内容时会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。
  • 另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率
  • (正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
顺序读写 随机读写
文件数目 读取一个大文件 读取多个小文件
**比较:**明显顺序读写只读取一个大文件,耗时更少。而随机读写需要打开多个文件,写进行多次的训导和旋转延迟,标绿远低于顺序读写
文件预读 顺序读写时磁盘会预读文件,即在读取的起始地址连续读取多个页面,若被预读的页面被使用,则无需再去读取 由于数据不在一起,无法预读
**比较:**在大并发的情况下,磁盘预读能够免去大量的读操作,处理速度肯定更快
系统的overhead 只需要找到一个文件,并对这个文件进行属性和权限的检查 需要找到多个文件,并对每个文件进行属性和权限检查
**比较:**只寻找一个文件,并确认属性和权限,肯定优于处理多个文件
写入数据 写入新文件时,需要寻找磁盘可用空间 写入新文件时,需要寻找磁盘可用空间。但由于一个文件的存储量更小,这个操作触发频率更多
**比较:**顺序读写创建新文件,只需要创建一个大文件就可以用很久,而随机读写可能频繁创建文件。创建文件时需要进行寻找磁盘可用空间等一些列操作,肯定更加耗时

刷盘机制#

image-20200804130631825

  • 同步刷盘:如上图左边所示,当消息真正持久化至磁盘后,RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用。

  • 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

  • # 刷盘方式ASYNC_FLUSH/SYNC_FLUSH
    flushDiskType=ASYNC_FLUSH
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52



    | | ** 同步刷盘** | **异步刷盘** |
    | ------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
    | **消息情况** | 在返回写成功状态时,消息已经被写入磁盘中。即消息被写入内存的PAGECACHE 中后,立刻通知刷新线程刷盘,等待刷盘完成,才会唤醒等待的线程并返回成功状态 | 在返回写成功状态时,消息可能只是被写入内存的 PAGECACHE 中。当内存的消息量积累到一定程度时,触发写操作快速写入 |
    | **性能** | 需要等待刷盘才能返回结果 | 消息写入内存后立刻返回结果,吞吐量更高 |
    | **可靠性** | 可以保持MQ的消息状态和生产者/消费者的消息状态一致 | Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致 |



    zeroCopy





    # 高可用

    ## NameServer 高可用

    由于 NameServer 节点是无状态的,只是保存了一个BrokerServer的地址,相当于注册中心。且各个节点直接的数据是一致的,故存在多个 NameServer 节点的情况下,部分 NameServer 不可用也可以保证 MQ 服务正常运行。



    ## BrokerServer 高可用

    - RocketMQ是通过 Master 和 Slave 的配合达到 BrokerServer 模块的高可用性的
    - 一个 Master 可以配置多个 Slave,同时也支持配置多个 Master-Slave 组。如2m2s

    **当其中一个 Master 出现问题时:**

    - 由于Slave只负责读,当 Master 不可用,它对应的 Slave 仍能保证消息被正常消费
    - 由于配置多组 Master-Slave 组,其他的 Master-Slave 组也会保证消息的正常发送和消费



    ## 消息消费高可用

    Consumer 的高可用是依赖于 Master-Slave 配置的,由于 Master 能够支持读写消息,Slave 支持读消息,当 Master 不可用或繁忙时, Consumer 会被自动切换到从 Slave 读取(自动切换,无需配置)。故当 Master 的机器故障后,消息仍可从 Slave 中被消费



    ## 消息发送高可用

    在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组,就是一个master和一堆slave组成一组。另一个master和一堆slave组成另外一组。2m2s就是2组)。

    ```properties
    #broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0

这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ开源版本目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

1
2
# 当前节点角色,重要
brokerRole=SYNC_MASTER

image-20200804131932891

消息主从复制保证高可用#

同步复制和异步复制#

若一个 Broker 组有一个 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步复制和异步复制两种方式

同步复制 异步复制
概念 即等 Master 和 Slave 均写成功后才反馈给客户端写成功状态 只要 Master 写成功,就反馈客户端写成功状态
可靠性 可靠性高,若 Master 出现故障,Slave 上有全部的备份数据,容易恢复 若 Master 出现故障,可能存在一些数据还没来得及写入 Slave,可能会丢失
效率 由于是同步复制,会增加数据写入延迟,降低系统吞吐量 由于只要写入 Master 即可,故数据写入延迟较低,吞吐量较高
1
2
3
4
5
# 当前节点角色,重要
# 异步复制
brokerRole=ASYNC_MASTER
# SYNC_MASTER:同步复制
# SLAVE:表明当前是从节点,无需配置 brokerRole

一般在实际应用中,由于同步刷盘方式会频繁触发磁盘写操作,明显降低性能,故通常配置为:

**刷盘方式:**ASYNC_FLUSH(异步刷盘)

**主从复制:**SYNC_MASTER(同步复制)

异步刷盘能够避免频繁触发磁盘写操作,除非服务器宕机,否则不会造成消息丢失。

主从同步复制能够保证消息不丢失,即使 Master 节点异常,也能保证 Slave 节点存储所有消息并被正常消费掉。

但是金融级应用一般是SYNC_FLUSH同步刷盘,能保证不丢消息。

负载均衡#

在实例发送消息时,默认会轮询所有订阅了改 Topic 的 broker 节点上的 message queue,让消息平均落在不同的 queue 上,而由于这些 queue 散落在不同的 broker 节点中,即使某个 broker 节点异常,其他存在订阅了这个 Topic 的 message queue 的 broker 依然能消费消息

image-20200804140402905

Topic和Queue#

RocketMQ的Topic和Queue与常规如ActiveMQ不同:

  • ActiveMQ的Queue是用来做点对点的发布订阅,Topic是用来做广播。

  • 但是RocketMQ不一样:RocketMQ的Topic内部有很多Queue。如:

    • image-20200804134726494

在新建Topic的时候就可以选择哪些borker来处理,并可以制定每个Broker内部开多少个读和写的队列数量,所以上图看到的就是每个broker内有多个queue:

image-20200804134901653

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 轮询每个broker下面的每个queue进行处理
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=0,getQueueOffset=148
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=1,getQueueOffset=148
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=2,getQueueOffset=148
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=3,getQueueOffset=148
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=4,getQueueOffset=148
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=5,getQueueOffset=148
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=6,getQueueOffset=148
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=7,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=8,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=9,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=10,getQueueOffset=150
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=11,getQueueOffset=150
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=12,getQueueOffset=151
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=13,getQueueOffset=151
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=14,getQueueOffset=152
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=15,getQueueOffset=151
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=0,getQueueOffset=152
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=1,getQueueOffset=152
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=2,getQueueOffset=152
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=3,getQueueOffset=152
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=4,getQueueOffset=151
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=5,getQueueOffset=151
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=6,getQueueOffset=150
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=7,getQueueOffset=150
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=8,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=9,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=10,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=11,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=12,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=13,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=14,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-b, getQueueId=15,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=0,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=1,getQueueOffset=149
发送一个消息,status=SEND_OK,BrokerName=broker-a, getQueueId=2,getQueueOffset=149
/。。。。

源代码流程#

发送普通消息的流程#

  • 准备工作:构建message、网络相关、线程相关
  • 从namesev拿到topic路由,缓存到map中
  • 组装数据,broker需要的序列化数据(json)
  • netty发送

image-20200804161635352

详细来说:

image-20200804144830052

https://yq.aliyun.com/articles/763218

https://blog.csdn.net/qq_34416331/article/details/107310833