Post on: 2026-1-15Last edited: 2026-1-15Words 5536Read Time 14 min

type
status
date
slug
summary
tags
category
icon
password
腾讯云的这篇关于rocketmq的文章写的很不错,做了一个很好的大致框架。

这篇文章也不错,是一个简短的概括的图文说明。以及「文末的参考文献」也很有阅读价值。图解RocketMQ架构 防止文章下架,参考文件也先备份在此。
本文主要会以本人在学习过程中的疑惑,采用提问的形式,对相关核心点进行原理性介绍。

消息和消费点位刷盘和ACK策略

提问
Rocketmq的刷盘策略是什么样的。
回答
默认都是写内存后就ACK,可配置
  1. 写消息的刷盘
  1. 消费点位的刷盘

如何保证CommitLog读取性能

  • ConsumeQueue 中 offset 高度局部以及顺序
  • mmap + page cache,使查询发生在内存,并且offset属于物理定位。
  • 一次 page fault → 提前cache多条后续可能会查询的消息。
  • Broker 会合并连续 offset 读取一个batch,并不是一条offset一次IO。

Rocketmq vs Kafka

维度
RocketMQ
Kafka
存储
CommitLog + ConsumeQueue
Partition
写控制
JVM 可控
OS page cache
mmap + slice
sendfile
过滤
Broker 侧
Consumer 侧
事务消息
支持
很难
适合
复杂业务
极致吞吐

CL消息同步CQ时机

提问
刷盘了才会推送到consume queue队列中是吗?

回答
不是。
👉 写入 CommitLog 成功(在内存中)就会异步构建 ConsumeQueue,和是否刷盘无关。
更精确地说:
  • CommitLog append 成功
    • → ReputMessageService 扫描 CommitLog
      生成 ConsumeQueue 索引
  • 刷盘只是持久化保证,不影响 ConsumeQueue 的生成
⚠️ 所以:
  • 没刷盘但 Broker 崩溃 →
    • CommitLog + ConsumeQueue 都可能丢(一起丢)
  • 不存在「刷盘后才进 ConsumeQueue」的严格顺序关系

CL和CQ的同步时机和一致性

提问
那如果写入commitLog内存,但是还没持久化到磁盘,并且也写入到consume Queue了,此时机器掉电,导致commitLog里没有持久化,但是consumqueue里面却有,会出现这种情况吗。

回答

关于同步时机
ConsumeQueue 并不是在写 CommitLog 时同步顺便写的,而是由一个后台线程异步同步进来的。
在 Broker 内部,有一个非常核心的后台线程:ReputMessageService。
它的职责是:
  • 顺序扫描 CommitLog。
  • 解析每条消息。
  • 把需要的信息分发给:
    • ConsumeQueue(队列索引)
    • IndexFile(Key 查询索引)
为什么不是“同步立即写 CQ”?
  • CommitLog 写入是 极高频路径
  • CQ 写入涉及:
    • 计算目录
    • mmap 文件定位
    • 多文件切换
👉 把 CQ 写入挪到异步线程,避免拖慢 Producer 写入
关于一致性
RocketMQ 永远以 CommitLog 为唯一真源。ConsumeQueue 是派生索引,可丢、可重建。
  • 丢了 CQ?
    • 顺序扫 CommitLog 重建
  • 丢了 CommitLog?
    • 数据直接没了
不存在 ConsumeQueue 比 CommitLog 更真实的情况。

CQ为什么不存完整数据

提问
那为什么在写CQ的时候,不直接把完整消息写进 CQ,而是写索引后,再重新查?

回答

因为 ConsumeQueue 的目标是"快速定位 + 顺序消费",不是"存数据"。
如果 ConsumeQueue 存完整消息,会发生什么?
  • 同一条消息:
    • Topic 有 N 个 queue
    • 会被复制 N 次?
  • 磁盘空间暴涨
  • IO 成倍增加
1000 万条完整消息:几个 GB(视消息大小而不定,假定一条业务消息1KB,则是约10G)。而在ConsumeQueue里:固定~200 MB。
👉 完全不可接受
个人总结
虽然理解上,直接在队列里面存完整消息,可以少一次反查,用空间换时间。
但是细细深入思考,这里也不能忽略多写入N次完整消息带来的额外磁盘空间消耗以及时间消耗,虽然一开始是写在内存里,但最终刷盘还是要落盘的,并且写入越多,由于OS的刷盘策略,可能刷盘也越频繁,造成IO负担。
相反,只存offset,由于Consume Queue里面每个消息体都是固定大小20B,非常小,能够加快写入速度,并且在同等空间下,能够一次读/写更多条目数据,更加IO和友好,吞吐量也更大。并且根据offset反查commitLog也会充分利用page cache,是内存操作,不会太消耗速度,并且是可以根据offset物理地址快速定位到commitLog中的位置,找一条消息相当于在内存中进行O(1)时间的定位,非常快,因此反查不会是瓶颈所在。
综合考虑,这样进行反查,反而能比直接存完整消息,更加快速度。
AI精炼总结:
RocketMQ 通过 CommitLog + ConsumeQueue 的分层设计
避免完整消息体的多份写入
用极小、固定长度的逻辑队列索引换取
高吞吐、低 IO、强 PageCache 友好性
offset 反查在绝大多数情况下是内存 O(1) 操作

mmap和page cache

提问
详细解释mmap,除了os层面的page cache,rocketmq如何保证优先访问内存中数据。

回答
  1. mmap 本身到底做了什么
  1. RocketMQ 如何用 mmap + PageCache
  1. RocketMQ 除了 PageCache,还做了哪些“尽量走内存”的设计

mmap本身到底做了什么

mmap 让程序像读写内存一样直接读写文件,将文件内容映射到进程的虚拟地址空间。
mmap 的关键优势
  • 零拷贝(不需要再 copy 到用户 buffer)。
  • 访问内存,比直接访问磁盘快很多。
  • 可直接通过指针 / ByteBuffer 访问

传统io vs mmap

特性
传统 I/O (read/write)
mmap (内存映射)
数据流向
两次复制:磁盘 → 内核缓冲区 → 用户缓冲区
零复制:磁盘文件直接映射到用户虚拟内存空间
系统调用
频繁调用 read()/write()
mmap() 建立映射,后续像访问内存一样操作
数据一致性
手动调用 fsync() 确保持久化
由操作系统页面缓存机制保证,可手动 msync()
适用场景
小文件、一次性读写、网络传输
大文件随机访问、进程间共享内存、频繁读写
内存开销
需额外分配用户缓冲区
按需加载(缺页中断),节省物理内存
编程复杂度
需要管理缓冲区、偏移量
直接指针操作,更符合编程直觉
性能特点
小文件快,大文件随机访问慢
大文件随机访问快,适合长时间操作

传统io时序图(数据复制,频繁陷入系统调用)

mmap时序图

RocketMQ如何使用mmap+page cache

核心文件全部使用mmap
组件
是否 mmap
CommitLog
ConsumeQueue
IndexFile
RocketMQ 的“访问路径”本质是内存访问
Consumer 拉消息:
这不是 read 系统调用,而是:
  • 普通内存访问
  • 如果命中 PageCache → 纯内存
  • 不命中 → Page Fault + 读盘
📌 RocketMQ 完全信任 OS 的 PageCache

除了PageCache RocketMQ如何充分利用内存


顺序性
  • CommitLog:
    • Producer:顺序append消息。
    • Consumer:按照offset递增读(顺序性)。
  • OS层:
    • 顺序IO → 自动预读,因此PageCache命中率极高,相当于一次缺页预读磁盘,后面很多条消息都不用再访问磁盘。
顺序性=命中率

CunsumeQueue极小,几乎常驻内存
  • 20B / 条
  • 单文件总量远小于 CommitLog
  • 非常容易被 OS 缓存
因此offset 查询几乎永远是内存操作。

文件预分配
  • CommitLog 文件提前创建固定大小(1G)
  • 避免频繁:
    • 扩容
    • inode / block 分配
    • 元数据 IO

通过TransientStorePool(堆外缓冲池)
TransientStorePool管理一堆DirectBuffer,DirectBuffer叫做堆外内存
内存直接读取,不走JVM映射
堆内buffer
堆外buffer
TransientStorePool机制
主要是防止PageCache脏页过多被强制刷盘
也就是TransientStorePool相当于是不对mmap一条一条写,而是用中间堆外内存buffer,后台批量刷到mmap内存。同时mmap也是异步刷盘,这样可以有效防止在PageCache脏页过多导致mmap强制刷盘时,从而导致写入的时候又要等待磁盘io,造成性能突刺。
rocketmq内部可配置TransientStorePool的一些参数,控制大小、写入mmap时机,刷盘时机。
📌 本质:把“写入压力”和“刷盘压力”解耦
随着 Linux 内核版本升级(特别是 2.6.32+ 引入的 writeback 控制),PageCache 管理更加智能。在较新系统中,TransientStorePool 的价值相对降低,建议:
  1. 新部署先尝试默认模式
  1. 遇到具体性能问题再考虑启用
  1. 结合 cgroup 内存控制等现代OS特性

RocketMQ的数据清理

提问
consume queue不删除队列数据,为什么,那mq如何清理队列中被消费掉的数据?

回答
ConsumeQueue 不删除“已消费”的数据,MQ 也不会按消费进度去删消息
RocketMQ 只按“时间 / 磁盘水位”清理 CommitLog 和 ConsumeQueue
这就要从RocketMQ的中的commitLog和comsumeQueue的存储说起了。
CommitLog是按照配置文件里的指定文件大小划分的,默认是1G。每个文件名都是按照起始offset命名的。

清理时机机制总览

  1. 定时检查。
    1. 后台线程CleanCommitLogService 每10秒触发检查,检查是否在删除时间窗口(默认凌晨4点),即使文件过期,也不一定马上删(除非磁盘告急)。
  1. 磁盘压力触发。
    1. Broker监控磁盘使用率。强制清理时,直接删最老的CommitLog文件,不关心是否真正被消费。保Broker,不保数据。
      阈值
      行为
      diskSpaceWarningLevelRatio
      打 WARN 日志
      diskSpaceCleanForciblyRatio
      强制清理 CommitLog
      diskMaxUsedSpaceRatio
      正常清理上限
  1. Broker启动/主从切换
    1. 启动时扫描 .delete 文件,看是否存在残留,进行补做清理。

清理的顺序

CommitLog 是唯一“真数据”
判断CommitLog文件是否可删除:
被判定需要删除后的删除步骤
  1. 设置文件为 read-only,重命名加上.delete后缀
  1. 延迟等待60s,等待OS page cache,以及正在读的消费者返回。
  1. 物理删除。

ConsumeQueue删除逻辑

根据已删除的CommitLog的offset范围,定位到ConsumeQueue中offset < minCommitLogOffset 的位置,直接截断文件。
每个ConsumeQueue对应队列,包含多个文件。
ConsumeQueue删除是采用逻辑截断。
虽然ConsumeQueue单个文件很小,每个文件存300000 * 20B = 5.72 MB 的记录,但是仅移动读指针,无限增长不清理物理空间也不是办法。
在截断时,遍历文件,如果某个文件完全在截断点之前,就可以删除那个文件。
如果当前文件是最新文件,被截断,并且没有写满,是可以继续追加写的,但是由于截断并不对文件本身修改,只是修改了读指针而已,因此文件大小依然是设置的5.7M不变。
如果ConsumeQueue中消息还在,但是CommitLog已经删了,Comsumer会收到OFFSET_TOO_SMALL

IndexFile的清理逻辑

IndexFile其实是设计的一个按照Key → CommitLogOffset的索引。
按照IndexFile的结束偏移量和现有CommitLog的偏移量对比。只有结束偏移量比现存CommitLog的偏移量小,才能删除。

消费点位Offset在哪里维护

提问
consume queue的点位是记载在哪里的 是broker为每个 consume group + topic + queue 记载的吗

回答
ConsumeQueue 的消费点位(消费进度)确实是 Broker 为每个 ConsumerGroup + Topic + Queue 单独记录和维护的
1. 存储位置
  • 主要存储:Broker端的 ConsumerOffsetManager
  • Key格式ConsumerGroup@Topic@QueueId
  • 持久化/store/config/consumerOffset.json
  • 备份存储:Consumer本地文件(用于恢复)
2. 核心特性
  • 按消费组隔离:每个ConsumerGroup独立记录
  • 按Queue细分:每个Queue独立记录
  • 防回退保护:只接受递增的offset
  • 定时持久化:默认5秒持久化一次
  • 故障恢复:Broker重启后从磁盘恢复

消费者的拉取和提交策略是什么样的

提问
rocketmq中,消费者的拉取策略是什么样的?消息是按照什么粒度进行消费和提交的?从消息消费到返回提交(成功或重试),消费端和broker端都发生了什么状态变化?
RocketMQ 是「消费者主动拉」模型,PushConsumer 只是对 Pull 的一层封装。
拉取策略核心参数
消息消费粒度
其中入参msg 的数量,和ConsumeMessageBatchMaxSize 指定的值是一样的。在消费的时候,也是按照这个粒度进行一次offset提交,这里offset提交指的是提交更新消费端内存中的offset。
ConsumeMessageBatchMaxSize=1 的情况

🔍 ConsumeMessageBatchMaxSize=3 的情况
提交offset
消费端提交offset时,不是一条一条提交,而是先维护已处理的最大offset点位,定时(5s)提交到broker。
broker接收到提交之后,更新Consume Queue的offset点位,并且持久化到consumerOffset.json
这里又引入一个问题:由于消费端5s提交一次offset,那么如果在提交之前,消费服务宕机了?Broker宕机了?网络问题?
提交offset异常分析
  1. 消费服务宕机。
  1. Broker端宕机。
  1. 网络问题

Loading...