RocketMQ消息存储
1. 存储设计原理图
1.1.基本概念
RocketMQ 9.CommitLog、ConsumeQueue、indexFile、offset
CommitLog:消息内容原文的存储文件

ConsumeQueue:不需要存储消息的内容,而存储的是消息在CommitLog中的offset,ConsumeQueue是CommitLog的索引文件


IndexFile:如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,indexFile就是为了解决这个问题的文件
1.2. 原理图

2. 消息存储类
org.apache.rocketmq.store.DefaultMessageStore.java
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 消息存储配置属性
private final MessageStoreConfig messageStoreConfig;
// CommitLog 文件存储实现类
private final CommitLog commitLog;
// 消息队列存储缓存表,按消息主题分组
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// 刷盘线程
private final FlushConsumeQueueService flushConsumeQueueService;
// 清除CommitLog文件服务
private final CleanCommitLogService cleanCommitLogService;
// 清除ConsumeQueue文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;
// 索引文件实现类
private final IndexService indexService;
// MappedFile分配服务
private final AllocateMappedFileService allocateMappedFileService;
// CommitLog消息分发,根据CommitLog文件构建ConsumeQueue,IndexFile文件
private final ReputMessageService reputMessageService;
// 存储HA机制
private final HAService haService;
private final ScheduleMessageService scheduleMessageService;
private final StoreStatsService storeStatsService;
// 消息堆内存缓存
private final TransientStorePool transientStorePool;
private final RunningFlags runningFlags = new RunningFlags();
private final SystemClock systemClock = new SystemClock();
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;
// 消息拉取长轮询模式消息达到监听器
private final MessageArrivingListener messageArrivingListener;
// Broker配置属性
private final BrokerConfig brokerConfig;
private volatile boolean shutdown = true;
// 文件刷盘监测点
private StoreCheckpoint storeCheckpoint;
private AtomicLong printTimes = new AtomicLong(0);
// CommitLog文件转发请求
private final LinkedList<CommitLogDispatcher> dispatcherList;
private RandomAccessFile lockFile;
private FileLock lock;
boolean shutDownNormal = false;
private final ScheduledExecutorService diskCheckScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread"));
3. 消息发送存储流程
3.1. 流程图
如果不太熟悉,可以跑一遍单测DefaultMessageStoreTest#testLookMessageByOffset_OffsetIsFirst
3.2. 源码解析
// DefaultMessageStore
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 校验Broker状态
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
// 校验消息
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
MappedFile unlockMappedFile = null;
// 查找最后一个MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 往MappedFile中追加消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
// 超出文件最大容量
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
// MappedFile
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 获取当前文件写入的pos
int currentPos = this.wrotePosition.get();
// 如果当前文件未写满,则进入追加逻辑
if (currentPos < this.fileSize) {
// 获取共享内存
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// CommitLog
// 依据存储协议存储消息(代码太多就不贴了)
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner);
4. 存储文件组织与内存映射
4.1. MappedFile源码
RocketMQ通过使用内存映射文件夹来提高IO访问性能
MappedFileQueue映射存储目录,MappedFile映射存储文件
// MappedFile属性值
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 提交指针,如果开启TransientStorePool,则会将储存在TransientStorePool中的数据提交到ByteBuffer,再刷写到磁盘
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 刷写磁盘指针
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize;
// 文件通道
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
// 堆外内存池,提供内存锁定机制,避免内存数据被置换到磁盘
protected TransientStorePool transientStorePool = null;
private String fileName;
private long fileFromOffset;
private File file;
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
4.2. TransientStorePool分析
TransientStorePool的作用:提供内存锁定机制,避免内存数据被置换到磁盘
(就像他注释说的是个很重的方法)
思考:什么业务场景会启动这种TransientStorePool机制??
private final int poolSize;
private final int fileSize;
// 使用双端队列存储锁定内存
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
/**
* It's a heavy init method.
*/
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
// 锁定内存,防止被置换,提高存储性能
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
4.3. AllocateMapperFileService创建MapperFile
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 两种创建方式,commitlog需要通过allocateMappedFileService创建,其他new MappedFile创建
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}

TODO:分析AllocateMapperFileService如何创建MapperFile
5. 存储文件
5.1. CommitLog文件
// 根据offset获取消息
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
5.2. ConsumeQueue文件
5.2.1. 根据Index定位物理文件
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
这里介绍一下CQ_STORE_UNIT_SIZE=20,我们可以从他的结构出发,consumequeue中存储的单元
- commitLog Offset是该消息在commitLog中的偏移量
- 消息大小
- 消息的hashcode

5.2.2. 时间戳定位物理文件
// 根据时间戳定位物理文件
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
if (mappedFile != null) {
long offset = 0;
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
// commitLog的最小有效偏移量
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
try {
while (high >= low) {
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
// commitLog offset
long phyOffset = byteBuffer.getLong();
// size
int size = byteBuffer.getInt();
// 小于commitLog的最小有效偏移量,缩小左区间
if (phyOffset < minPhysicOffset) {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
// commitLog中消息的时间戳
long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
rightIndexValue = storeTime;
} else {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
leftIndexValue = storeTime;
}
}
// 找到对应的消息
if (targetOffset != -1) {
offset = targetOffset;
} else { // 没有完全符合条件的消息,找最接近timestamp的消息
if (leftIndexValue == -1) {
offset = rightOffset;
} else if (rightIndexValue == -1) {
offset = leftOffset;
} else {
offset =
Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- rightIndexValue) ? rightOffset : leftOffset;
}
}
return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
} finally {
sbr.release();
}
}
}
return 0;
}
5.3. IndexFile文件
本质就是一个文件版的HashMap

5.4. ConsumeQueue和IndexFile的恢复
- 首先会判断是否是异常退出,看abort文件是否存在
实现机制:Broker在启动时会创建abort文件,退出时通过注册JVM钩子函数删除abort文件,如果下一次启动Broker时,abort文件存在,则说明Broker异常退出
boolean lastExitOK = !this.isTempFileExist();
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
- 加载commitlog和consumequeue
// 这里都是非故障的初始化
// load Commit Log
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
- 根据是否有故障选择恢复模式
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
private void recover(final boolean lastExitOK) {
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
5.4.1. 正常恢复
5.4.2. 异常恢复