【RocketMQ源码精读】(三)RocketMQ消息存储

RocketMQ消息存储

1. 存储设计原理图

1.1.基本概念

RocketMQ 9.CommitLog、ConsumeQueue、indexFile、offset

CommitLog:消息内容原文的存储文件

image-20210711170030530

ConsumeQueue:不需要存储消息的内容,而存储的是消息在CommitLog中的offset,ConsumeQueue是CommitLog的索引文件

image-20210711170237376

1824809-c91d5ee415285f28

IndexFile:如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,indexFile就是为了解决这个问题的文件

1.2. 原理图

img

2. 消息存储类

org.apache.rocketmq.store.DefaultMessageStore.java

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

// 消息存储配置属性
private final MessageStoreConfig messageStoreConfig;
// CommitLog 文件存储实现类
private final CommitLog commitLog;

// 消息队列存储缓存表,按消息主题分组
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

// 刷盘线程
private final FlushConsumeQueueService flushConsumeQueueService;

// 清除CommitLog文件服务
private final CleanCommitLogService cleanCommitLogService;

// 清除ConsumeQueue文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;

// 索引文件实现类
private final IndexService indexService;

// MappedFile分配服务
private final AllocateMappedFileService allocateMappedFileService;

// CommitLog消息分发,根据CommitLog文件构建ConsumeQueue,IndexFile文件
private final ReputMessageService reputMessageService;

// 存储HA机制
private final HAService haService;

private final ScheduleMessageService scheduleMessageService;

private final StoreStatsService storeStatsService;

// 消息堆内存缓存
private final TransientStorePool transientStorePool;

private final RunningFlags runningFlags = new RunningFlags();
private final SystemClock systemClock = new SystemClock();

private final ScheduledExecutorService scheduledExecutorService =
  Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;

// 消息拉取长轮询模式消息达到监听器
private final MessageArrivingListener messageArrivingListener;

// Broker配置属性
private final BrokerConfig brokerConfig;

private volatile boolean shutdown = true;

// 文件刷盘监测点
private StoreCheckpoint storeCheckpoint;

private AtomicLong printTimes = new AtomicLong(0);

// CommitLog文件转发请求
private final LinkedList<CommitLogDispatcher> dispatcherList;

private RandomAccessFile lockFile;

private FileLock lock;

boolean shutDownNormal = false;

private final ScheduledExecutorService diskCheckScheduledExecutorService =
  	Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread"));

3. 消息发送存储流程

3.1. 流程图

如果不太熟悉,可以跑一遍单测DefaultMessageStoreTest#testLookMessageByOffset_OffsetIsFirst

image-20210712211813626

3.2. 源码解析

// DefaultMessageStore
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
  	// 校验Broker状态
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
      return new PutMessageResult(checkStoreStatus, null);
    }

  	// 校验消息
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
      return new PutMessageResult(msgCheckStatus, null);
    }

    long beginTime = this.getSystemClock().now();
    PutMessageResult result = this.commitLog.putMessage(msg);
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
      log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }

    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

    if (null == result || !result.isOk()) {
      this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}
MappedFile unlockMappedFile = null;
// 查找最后一个MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    this.beginTimeInLock = beginLockTimestamp;

    // Here settings are stored timestamp, in order to ensure an orderly
    // global
    msg.setStoreTimestamp(beginLockTimestamp);

    if (null == mappedFile || mappedFile.isFull()) {
      mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    }
    if (null == mappedFile) {
      log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
      beginTimeInLock = 0;
      return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
    }

    // 往MappedFile中追加消息
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
      case PUT_OK:
        break;
      // 超出文件最大容量
      case END_OF_FILE:
        unlockMappedFile = mappedFile;
        // Create a new file, re-write the message
        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
        if (null == mappedFile) {
          // XXX: warn and notify me
          log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
          beginTimeInLock = 0;
          return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
        }
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        break;
      case MESSAGE_SIZE_EXCEEDED:
      case PROPERTIES_SIZE_EXCEEDED:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
      case UNKNOWN_ERROR:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
      default:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
    }

    elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    beginTimeInLock = 0;
} finally {
  	putMessageLock.unlock();
}
// MappedFile
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        // 获取当前文件写入的pos
        int currentPos = this.wrotePosition.get();

        // 如果当前文件未写满,则进入追加逻辑
        if (currentPos < this.fileSize) {
            // 获取共享内存
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// CommitLog
// 依据存储协议存储消息(代码太多就不贴了)
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner);

4. 存储文件组织与内存映射

4.1. MappedFile源码

RocketMQ通过使用内存映射文件夹来提高IO访问性能

MappedFileQueue映射存储目录,MappedFile映射存储文件

// MappedFile属性值

public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
protected final AtomicInteger wrotePosition = new AtomicInteger(0);

// 提交指针,如果开启TransientStorePool,则会将储存在TransientStorePool中的数据提交到ByteBuffer,再刷写到磁盘
protected final AtomicInteger committedPosition = new AtomicInteger(0);

// 刷写磁盘指针
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize;

// 文件通道
protected FileChannel fileChannel;
/**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
protected ByteBuffer writeBuffer = null;

// 堆外内存池,提供内存锁定机制,避免内存数据被置换到磁盘
protected TransientStorePool transientStorePool = null;
private String fileName;
private long fileFromOffset;
private File file;
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;

4.2. TransientStorePool分析

TransientStorePool的作用:提供内存锁定机制,避免内存数据被置换到磁盘

(就像他注释说的是个很重的方法)

思考:什么业务场景会启动这种TransientStorePool机制??

private final int poolSize;
private final int fileSize;

// 使用双端队列存储锁定内存
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;


/**
* It's a heavy init method.
*/
public void init() {
  for (int i = 0; i < poolSize; i++) {
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

    final long address = ((DirectBuffer) byteBuffer).address();
    Pointer pointer = new Pointer(address);
    
    // 锁定内存,防止被置换,提高存储性能
    LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

    availableBuffers.offer(byteBuffer);
  }
}

4.3. AllocateMapperFileService创建MapperFile

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;
    MappedFile mappedFileLast = getLastMappedFile();

    if (mappedFileLast == null) {
      createOffset = startOffset - (startOffset % this.mappedFileSize);
    }

    if (mappedFileLast != null && mappedFileLast.isFull()) {
      createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    if (createOffset != -1 && needCreate) {
      String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
      String nextNextFilePath = this.storePath + File.separator
        + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
      MappedFile mappedFile = null;

      // 两种创建方式,commitlog需要通过allocateMappedFileService创建,其他new MappedFile创建
      if (this.allocateMappedFileService != null) {
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                                                                                  nextNextFilePath, this.mappedFileSize);
      } else {
        try {
          mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
          log.error("create mappedFile exception", e);
        }
      }

      if (mappedFile != null) {
        if (this.mappedFiles.isEmpty()) {
          mappedFile.setFirstCreateInQueue(true);
        }
        this.mappedFiles.add(mappedFile);
      }

      return mappedFile;
    }

    return mappedFileLast;
}

image-20211006173234896

TODO:分析AllocateMapperFileService如何创建MapperFile

5. 存储文件

5.1. CommitLog文件

// 根据offset获取消息
public SelectMappedBufferResult getMessage(final long offset, final int size) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
      int pos = (int) (offset % mappedFileSize);
      return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}

5.2. ConsumeQueue文件

5.2.1. 根据Index定位物理文件

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    int mappedFileSize = this.mappedFileSize;
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
      MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
      if (mappedFile != null) {
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
        return result;
      }
    }
    return null;
}

这里介绍一下CQ_STORE_UNIT_SIZE=20,我们可以从他的结构出发,consumequeue中存储的单元

  1. commitLog Offset是该消息在commitLog中的偏移量
  2. 消息大小
  3. 消息的hashcode

img

5.2.2. 时间戳定位物理文件

// 根据时间戳定位物理文件
public long getOffsetInQueueByTime(final long timestamp) {
  MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
  if (mappedFile != null) {
    long offset = 0;
    int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
    int high = 0;
    int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
    long leftIndexValue = -1L, rightIndexValue = -1L;

    // commitLog的最小有效偏移量
    long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
    SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
    if (null != sbr) {
      ByteBuffer byteBuffer = sbr.getByteBuffer();
      high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
      try {
        while (high >= low) {
          midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
          byteBuffer.position(midOffset);

          // commitLog offset
          long phyOffset = byteBuffer.getLong();

          // size
          int size = byteBuffer.getInt();

          // 小于commitLog的最小有效偏移量,缩小左区间
          if (phyOffset < minPhysicOffset) {
            low = midOffset + CQ_STORE_UNIT_SIZE;
            leftOffset = midOffset;
            continue;
          }

          // commitLog中消息的时间戳
          long storeTime =
            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
          if (storeTime < 0) {
            return 0;
          } else if (storeTime == timestamp) {
            targetOffset = midOffset;
            break;
          } else if (storeTime > timestamp) {
            high = midOffset - CQ_STORE_UNIT_SIZE;
            rightOffset = midOffset;
            rightIndexValue = storeTime;
          } else {
            low = midOffset + CQ_STORE_UNIT_SIZE;
            leftOffset = midOffset;
            leftIndexValue = storeTime;
          }
        }

        // 找到对应的消息
        if (targetOffset != -1) {

          offset = targetOffset;
        } else {    // 没有完全符合条件的消息,找最接近timestamp的消息
          if (leftIndexValue == -1) {

            offset = rightOffset;
          } else if (rightIndexValue == -1) {

            offset = leftOffset;
          } else {
            offset =
              Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                                              - rightIndexValue) ? rightOffset : leftOffset;
          }
        }

        return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
      } finally {
        sbr.release();
      }
    }
  }
  return 0;
}

5.3. IndexFile文件

本质就是一个文件版的HashMap

img

5.4. ConsumeQueue和IndexFile的恢复

  1. 首先会判断是否是异常退出,看abort文件是否存在

实现机制:Broker在启动时会创建abort文件,退出时通过注册JVM钩子函数删除abort文件,如果下一次启动Broker时,abort文件存在,则说明Broker异常退出

boolean lastExitOK = !this.isTempFileExist();

private boolean isTempFileExist() {
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    return file.exists();
}
  1. 加载commitlog和consumequeue
// 这里都是非故障的初始化
// load Commit Log
result = result && this.commitLog.load();

// load Consume Queue
result = result && this.loadConsumeQueue();
  1. 根据是否有故障选择恢复模式
if (result) {
    this.storeCheckpoint =
      new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

    this.indexService.load(lastExitOK);

    this.recover(lastExitOK);

    log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}

private void recover(final boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

    if (lastExitOK) {
      this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
      this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }

    this.recoverTopicQueueTable();
}

5.4.1. 正常恢复

5.4.2. 异常恢复

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×