type
status
date
slug
summary
tags
category
icon
password
腾讯云的这篇关于rocketmq的文章写的很不错,做了一个很好的大致框架。
这篇文章也不错,是一个简短的概括的图文说明。以及「文末的参考文献」也很有阅读价值。图解RocketMQ架构
防止文章下架,参考文件也先备份在此。
本文主要会以本人在学习过程中的疑惑,采用提问的形式,对相关核心点进行原理性介绍。
消息和消费点位刷盘和ACK策略
提问
Rocketmq的刷盘策略是什么样的。
回答
默认都是写内存后就ACK,可配置
- 写消息的刷盘
- 消费点位的刷盘
如何保证CommitLog读取性能
- ConsumeQueue 中 offset 高度局部以及顺序
- mmap + page cache,使查询发生在内存,并且offset属于物理定位。
- 一次 page fault → 提前cache多条后续可能会查询的消息。
- Broker 会合并连续 offset 读取一个batch,并不是一条offset一次IO。
Rocketmq vs Kafka
维度 | RocketMQ | Kafka |
存储 | CommitLog + ConsumeQueue | Partition |
写控制 | JVM 可控 | OS page cache |
读 | mmap + slice | sendfile |
过滤 | Broker 侧 | Consumer 侧 |
事务消息 | 支持 | 很难 |
适合 | 复杂业务 | 极致吞吐 |
CL消息同步CQ时机
提问
刷盘了才会推送到consume queue队列中是吗?
回答
不是。
👉 写入 CommitLog 成功(在内存中)就会异步构建 ConsumeQueue,和是否刷盘无关。
更精确地说:
- CommitLog append 成功
→ ReputMessageService 扫描 CommitLog
→ 生成 ConsumeQueue 索引
- 刷盘只是持久化保证,不影响 ConsumeQueue 的生成
⚠️ 所以:
- 没刷盘但 Broker 崩溃 →
CommitLog + ConsumeQueue 都可能丢(一起丢)
- 不存在「刷盘后才进 ConsumeQueue」的严格顺序关系
CL和CQ的同步时机和一致性
提问
那如果写入commitLog内存,但是还没持久化到磁盘,并且也写入到consume Queue了,此时机器掉电,导致commitLog里没有持久化,但是consumqueue里面却有,会出现这种情况吗。
回答
关于同步时机
ConsumeQueue 并不是在写 CommitLog 时同步顺便写的,而是由一个后台线程异步同步进来的。
在 Broker 内部,有一个非常核心的后台线程:ReputMessageService。
它的职责是:
- 顺序扫描 CommitLog。
- 解析每条消息。
- 把需要的信息分发给:
- ConsumeQueue(队列索引)
- IndexFile(Key 查询索引)
为什么不是“同步立即写 CQ”?
- CommitLog 写入是 极高频路径
- CQ 写入涉及:
- 计算目录
- mmap 文件定位
- 多文件切换
👉 把 CQ 写入挪到异步线程,避免拖慢 Producer 写入
关于一致性
RocketMQ 永远以 CommitLog 为唯一真源。ConsumeQueue 是派生索引,可丢、可重建。
- 丢了 CQ?
- 顺序扫 CommitLog 重建
- 丢了 CommitLog?
- 数据直接没了
不存在 ConsumeQueue 比 CommitLog 更真实的情况。
CQ为什么不存完整数据
提问
那为什么在写CQ的时候,不直接把完整消息写进 CQ,而是写索引后,再重新查?
回答
因为 ConsumeQueue 的目标是"快速定位 + 顺序消费",不是"存数据"。
如果 ConsumeQueue 存完整消息,会发生什么?
- 同一条消息:
- Topic 有 N 个 queue
- 会被复制 N 次?
- 磁盘空间暴涨
- IO 成倍增加
1000 万条完整消息:几个 GB(视消息大小而不定,假定一条业务消息1KB,则是约10G)。而在ConsumeQueue里:固定~200 MB。
👉 完全不可接受
个人总结
虽然理解上,直接在队列里面存完整消息,可以少一次反查,用空间换时间。
但是细细深入思考,这里也不能忽略多写入N次完整消息带来的额外磁盘空间消耗以及时间消耗,虽然一开始是写在内存里,但最终刷盘还是要落盘的,并且写入越多,由于OS的刷盘策略,可能刷盘也越频繁,造成IO负担。
相反,只存offset,由于Consume Queue里面每个消息体都是固定大小20B,非常小,能够加快写入速度,并且在同等空间下,能够一次读/写更多条目数据,更加IO和友好,吞吐量也更大。并且根据offset反查commitLog也会充分利用page cache,是内存操作,不会太消耗速度,并且是可以根据offset物理地址快速定位到commitLog中的位置,找一条消息相当于在内存中进行O(1)时间的定位,非常快,因此反查不会是瓶颈所在。
综合考虑,这样进行反查,反而能比直接存完整消息,更加快速度。
AI精炼总结:
RocketMQ 通过 CommitLog + ConsumeQueue 的分层设计
避免完整消息体的多份写入
用极小、固定长度的逻辑队列索引换取
高吞吐、低 IO、强 PageCache 友好性
offset 反查在绝大多数情况下是内存 O(1) 操作
mmap和page cache
提问
详细解释mmap,除了os层面的page cache,rocketmq如何保证优先访问内存中数据。
回答
- mmap 本身到底做了什么
- RocketMQ 如何用 mmap + PageCache
- RocketMQ 除了 PageCache,还做了哪些“尽量走内存”的设计
mmap本身到底做了什么
mmap 让程序像读写内存一样直接读写文件,将文件内容映射到进程的虚拟地址空间。mmap 的关键优势
- 零拷贝(不需要再 copy 到用户 buffer)。
- 访问内存,比直接访问磁盘快很多。
- 可直接通过指针 / ByteBuffer 访问
传统io vs mmap
特性 | 传统 I/O (read/write) | mmap (内存映射) |
数据流向 | 两次复制:磁盘 → 内核缓冲区 → 用户缓冲区 | 零复制:磁盘文件直接映射到用户虚拟内存空间 |
系统调用 | 频繁调用 read()/write() | 仅 mmap() 建立映射,后续像访问内存一样操作 |
数据一致性 | 手动调用 fsync() 确保持久化 | 由操作系统页面缓存机制保证,可手动 msync() |
适用场景 | 小文件、一次性读写、网络传输 | 大文件随机访问、进程间共享内存、频繁读写 |
内存开销 | 需额外分配用户缓冲区 | 按需加载(缺页中断),节省物理内存 |
编程复杂度 | 需要管理缓冲区、偏移量 | 直接指针操作,更符合编程直觉 |
性能特点 | 小文件快,大文件随机访问慢 | 大文件随机访问快,适合长时间操作 |
传统io时序图(数据复制,频繁陷入系统调用)
mmap时序图
RocketMQ如何使用mmap+page cache
核心文件全部使用mmap
组件 | 是否 mmap |
CommitLog | ✅ |
ConsumeQueue | ✅ |
IndexFile | ✅ |
RocketMQ 的“访问路径”本质是内存访问
Consumer 拉消息:
这不是 read 系统调用,而是:
- 普通内存访问
- 如果命中 PageCache → 纯内存
- 不命中 → Page Fault + 读盘
📌 RocketMQ 完全信任 OS 的 PageCache
除了PageCache RocketMQ如何充分利用内存
顺序性
- CommitLog:
- Producer:顺序append消息。
- Consumer:按照offset递增读(顺序性)。
- OS层:
- 顺序IO → 自动预读,因此PageCache命中率极高,相当于一次缺页预读磁盘,后面很多条消息都不用再访问磁盘。
顺序性=命中率
CunsumeQueue极小,几乎常驻内存
- 20B / 条
- 单文件总量远小于 CommitLog
- 非常容易被 OS 缓存
因此offset 查询几乎永远是内存操作。
文件预分配
- CommitLog 文件提前创建固定大小(1G)
- 避免频繁:
- 扩容
- inode / block 分配
- 元数据 IO
通过TransientStorePool(堆外缓冲池)
TransientStorePool管理一堆DirectBuffer,DirectBuffer叫做堆外内存,
内存直接读取,不走JVM映射
堆内buffer
堆外buffer
TransientStorePool机制
主要是防止PageCache脏页过多被强制刷盘
也就是TransientStorePool相当于是不对mmap一条一条写,而是用中间堆外内存buffer,后台批量刷到mmap内存。同时mmap也是异步刷盘,这样可以有效防止在PageCache脏页过多导致mmap强制刷盘时,从而导致写入的时候又要等待磁盘io,造成性能突刺。
rocketmq内部可配置TransientStorePool的一些参数,控制大小、写入mmap时机,刷盘时机。
📌 本质:把“写入压力”和“刷盘压力”解耦
随着 Linux 内核版本升级(特别是 2.6.32+ 引入的 writeback 控制),PageCache 管理更加智能。在较新系统中,TransientStorePool 的价值相对降低,建议:
- 新部署先尝试默认模式
- 遇到具体性能问题再考虑启用
- 结合
cgroup内存控制等现代OS特性
RocketMQ的数据清理
提问
consume queue不删除队列数据,为什么,那mq如何清理队列中被消费掉的数据?
回答
ConsumeQueue 不删除“已消费”的数据,MQ 也不会按消费进度去删消息RocketMQ 只按“时间 / 磁盘水位”清理 CommitLog 和 ConsumeQueue
这就要从RocketMQ的中的commitLog和comsumeQueue的存储说起了。
CommitLog是按照配置文件里的指定文件大小划分的,默认是1G。每个文件名都是按照起始offset命名的。
清理时机机制总览
- 定时检查。
后台线程
CleanCommitLogService 每10秒触发检查,检查是否在删除时间窗口(默认凌晨4点),即使文件过期,也不一定马上删(除非磁盘告急)。- 磁盘压力触发。
Broker监控磁盘使用率。强制清理时,直接删最老的CommitLog文件,不关心是否真正被消费。保Broker,不保数据。
阈值 | 行为 |
diskSpaceWarningLevelRatio | 打 WARN 日志 |
diskSpaceCleanForciblyRatio | 强制清理 CommitLog |
diskMaxUsedSpaceRatio | 正常清理上限 |
- Broker启动/主从切换
启动时扫描 .delete 文件,看是否存在残留,进行补做清理。
清理的顺序
CommitLog 是唯一“真数据”
判断CommitLog文件是否可删除:
被判定需要删除后的删除步骤
- 设置文件为 read-only,重命名加上.delete后缀
- 延迟等待60s,等待OS page cache,以及正在读的消费者返回。
- 物理删除。
ConsumeQueue删除逻辑
根据已删除的CommitLog的offset范围,定位到ConsumeQueue中
offset < minCommitLogOffset 的位置,直接截断文件。每个ConsumeQueue对应队列,包含多个文件。
ConsumeQueue删除是采用逻辑截断。
虽然ConsumeQueue单个文件很小,每个文件存
300000 * 20B = 5.72 MB 的记录,但是仅移动读指针,无限增长不清理物理空间也不是办法。在截断时,遍历文件,如果某个文件完全在截断点之前,就可以删除那个文件。
如果当前文件是最新文件,被截断,并且没有写满,是可以继续追加写的,但是由于截断并不对文件本身修改,只是修改了读指针而已,因此文件大小依然是设置的5.7M不变。
如果ConsumeQueue中消息还在,但是CommitLog已经删了,Comsumer会收到
OFFSET_TOO_SMALL。IndexFile的清理逻辑
IndexFile其实是设计的一个按照Key → CommitLogOffset的索引。
按照IndexFile的结束偏移量和现有CommitLog的偏移量对比。只有结束偏移量比现存CommitLog的偏移量小,才能删除。
消费点位Offset在哪里维护
提问
consume queue的点位是记载在哪里的 是broker为每个 consume group + topic + queue 记载的吗
回答
ConsumeQueue 的消费点位(消费进度)确实是 Broker 为每个 ConsumerGroup + Topic + Queue 单独记录和维护的。
1. 存储位置
- 主要存储:Broker端的
ConsumerOffsetManager
- Key格式:
ConsumerGroup@Topic@QueueId
- 持久化:
/store/config/consumerOffset.json
- 备份存储:Consumer本地文件(用于恢复)
2. 核心特性
- 按消费组隔离:每个ConsumerGroup独立记录
- 按Queue细分:每个Queue独立记录
- 防回退保护:只接受递增的offset
- 定时持久化:默认5秒持久化一次
- 故障恢复:Broker重启后从磁盘恢复
消费者的拉取和提交策略是什么样的
提问
rocketmq中,消费者的拉取策略是什么样的?消息是按照什么粒度进行消费和提交的?从消息消费到返回提交(成功或重试),消费端和broker端都发生了什么状态变化?
RocketMQ 是「消费者主动拉」模型,PushConsumer 只是对 Pull 的一层封装。
拉取策略核心参数
消息消费粒度
其中入参
msg 的数量,和ConsumeMessageBatchMaxSize 指定的值是一样的。在消费的时候,也是按照这个粒度进行一次offset提交,这里offset提交指的是提交更新消费端内存中的offset。ConsumeMessageBatchMaxSize=1 的情况
🔍 ConsumeMessageBatchMaxSize=3 的情况
提交offset
消费端提交offset时,不是一条一条提交,而是先维护已处理的最大offset点位,定时(5s)提交到broker。
broker接收到提交之后,更新Consume Queue的offset点位,并且持久化到
consumerOffset.json这里又引入一个问题:由于消费端5s提交一次offset,那么如果在提交之前,消费服务宕机了?Broker宕机了?网络问题?
提交offset异常分析
- 消费服务宕机。
- Broker端宕机。
- 网络问题
- Author:Ago
- URL:http://www.sunago.top/article/rocketmq-note
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!


