【RocketMQ源码精读】(四)RocketMQ消息消费

RocketMQ消息消费

1. 消费者启动

消费者的启动主要是DefaultMQPushConsumer

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {...}
// DefaultMQPushConsumer启动
@Override
public void start() throws MQClientException {
    // namespace是外部传入的实例名称
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    this.defaultMQPushConsumerImpl.start();

    // 异步传输消息跟踪数据
    if (null != traceDispatcher) {
      try {
        traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
      } catch (MQClientException e) {
        log.warn("trace dispatcher start failed ", e);
      }
    }
}

DefaultMQPushConsumer在这里做了3件事

  1. 设置ConsumerGroup名称
  2. DefaultMQPushConsumerImpl启动服务
  3. 异步传输 消息跟踪数据

// DefaultMQPushConsumerImpl启动 
public synchronized void start() throws MQClientException {....}

在这个方法中我只分析刚启动,CREATE_JUST这个case

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

// rebalanceImpl构建主题订阅信息
this.copySubscription();

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
  this.defaultMQPushConsumer.changeInstanceNameToPID();
}

// 每个JVM上只有一个MQClientManager实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(
  mQClientFactory,
  this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
  	this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        // 广播:消费进度存储在本地
      case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
        // 集群:消费进度存储在broker
      case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
      default:
        break;
    }
    // 设置消费进度
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();


if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { // 顺序消费
  this.consumeOrderly = true;
  this.consumeMessageService =
    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { // 并发消费
  this.consumeOrderly = false;
  this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

// TODO 启动消费线程。 顺序消费:定期清理过期消息;并发消费:锁消息
this.consumeMessageService.start();

// 为mQClientFactory注册defaultMQPushConsumer
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
  this.serviceState = ServiceState.CREATE_JUST;
  this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
  throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                              + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                              null);
}

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
  1. rebalanceImpl构建主题订阅信息
    1. 将正常订阅消息加入到rebalanceImpl
    2. 将retry订阅消息加入到rebalanceImpl
// 为rebalanceImpl构建主题订阅信息
private void copySubscription() throws MQClientException {
  try {
    Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
    if (sub != null) {
      for (final Map.Entry<String, String> entry : sub.entrySet()) {
        final String topic = entry.getKey();
        final String subString = entry.getValue();
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                                                                            topic, subString);
        // 加入到rebalanceImpl的订阅消息中
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
      }
    }

    if (null == this.messageListenerInner) {
      this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        // 广播不重试
      case BROADCASTING:
        break;
      case CLUSTERING:
        final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                                                                            retryTopic, SubscriptionData.SUB_ALL);
        this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
        break;
      default:
        break;
    }
  } catch (Exception e) {
    throw new MQClientException("subscription exception", e);
  }
}
  1. 加载消费进度,注意集群和广播模式的存储位置不同
this.offsetStore.load();
// 广播模式的load方法
@Override
public void load() throws MQClientException {
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
      offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

      for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
        AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
        log.info("load consumer's offset, {} {} {}",
                 this.groupName,
                 mq,
                 offset.get());
      }
    }
}
  1. 确定消费模式,并启动消费线程消费消息(后续再补充,这里简单提一嘴)
this.consumeMessageService.start();
// 顺序消费
public void start() {
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
      this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
          ConsumeMessageOrderlyService.this.lockMQPeriodically();
        }
      }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}

// 并发消费
public void start() {
    this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

      @Override
      public void run() {
        cleanExpireMsg();
      }

    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
  1. 为mQClientFactory注册defaultMQPushConsumer,并启动服务
    1. 获取一次nameserver的addr并更新路由表
    2. 启动通信管道
    3. 启动各种定时任务
    4. 启动消息拉取服务
    5. 启动负载均衡服务
    6. 启动生产者,不过这个版本被弃用了
mQClientFactory.start();
public void start() throws MQClientException {

    synchronized (this) {
      switch (this.serviceState) {
        case CREATE_JUST:
          this.serviceState = ServiceState.START_FAILED;
          // If not specified,looking address from name server
          if (null == this.clientConfig.getNamesrvAddr()) {
            this.mQClientAPIImpl.fetchNameServerAddr();
          }
          // Start request-response channel
          this.mQClientAPIImpl.start();
          // Start various schedule tasks
          this.startScheduledTask();
          // Start pull service
          this.pullMessageService.start();
          // Start rebalance service
          this.rebalanceService.start();
          // Start push service
          this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
          log.info("the client factory [{}] start OK", this.clientId);
          this.serviceState = ServiceState.RUNNING;
          break;
        case START_FAILED:
          throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
        default:
          break;
      }
    }
}

2. 消费者拉取消息

让我们把目光放到PullMessageService

// 阻塞队列
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
      try {
        // 如果没有请求就阻塞
        PullRequest pullRequest = this.pullRequestQueue.take();
        this.pullMessage(pullRequest);
      } catch (InterruptedException ignored) {
      } catch (Exception e) {
        log.error("Pull Message Service Run Method exception", e);
      }
    }

    log.info(this.getServiceName() + " service end");
}

PullMessageService启动了一个死循环,获取拉取请求再根据该请求拉取消息

那在哪里会放入这些请求呢?让我们先把这个问题放一放,先来看一下pullMessage

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
      DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
      impl.pullMessage(pullRequest);
    } else {
      log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

2.1. 校验

// 获取消息处理队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
  log.info("the pull request[{}] is dropped.", pullRequest.toString());
  return;
}

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
  this.makeSureStateOK();
} catch (MQClientException e) {
  log.warn("pullMessage exception, consumer state not ok", e);
  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  return;
}

// 消费者暂停消费
if (this.isPause()) {
  log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
  // 延迟拉取消息
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
  return;
}

如果有异常就延迟拉取消息

2.2. 流量控制

// 消息拉取流控
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

// 拉取的消息条数大于1000条
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  if ((queueFlowControlTimes++ % 1000) == 0) {
    log.warn(
      "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
      this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
  }
  return;
}

// 消息的大小大于100MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  if ((queueFlowControlTimes++ % 1000) == 0) {
    log.warn(
      "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
      this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
  }
  return;
}
// 并发消费
if (!this.consumeOrderly) {
  if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
      log.warn(
        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
        pullRequest, queueMaxSpanFlowControlTimes);
    }
    return;
  }
} else {
  if (processQueue.isLocked()) {
    if (!pullRequest.isLockedFirst()) {
      final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
      boolean brokerBusy = offset < pullRequest.getNextOffset();
      log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
               pullRequest, offset, brokerBusy);
      if (brokerBusy) {
        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                 pullRequest, offset);
      }

      pullRequest.setLockedFirst(true);
      pullRequest.setNextOffset(offset);
    }
  } else {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.info("pull message later because not locked in broker, {}", pullRequest);
    return;
  }
}

  1. 待处理消息条数大于1000条
  2. 待处理消息的大小大于100MB
  3. 待处理消息的offset太大或者broker繁忙

都会延迟拉取消息

2.3. 构建系统标记

// 构建消息拉取系统标记
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
  commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
  if (commitOffsetValue > 0) {
    commitOffsetEnable = true;
  }
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
  if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
    subExpression = sd.getSubString();
  }

  classFilter = sd.isClassFilterMode();
}

int sysFlag = PullSysFlag.buildSysFlag(
  // 表示从内存中读取的消费进度大于 0,则设置该标记位
  commitOffsetEnable, // commitOffset
  // 表示消息拉取时支持挂起
  true, // suspend
  // 消息过滤机制为表达式,则设置该标记位
  subExpression != null, // subscription
  // 消息过滤机制为类过滤模式
  classFilter // class filter
);

2.4. 拉取消息的核心代码

 this.pullAPIWrapper.pullKernelImpl(
     pullRequest.getMessageQueue(),
     subExpression,
     subscriptionData.getExpressionType(),
     subscriptionData.getSubVersion(),
     pullRequest.getNextOffset(),
     this.defaultMQPushConsumer.getPullBatchSize(),
     sysFlag,
     commitOffsetValue,
     BROKER_SUSPEND_MAX_TIME_MILLIS,
     CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
     CommunicationMode.ASYNC,
     pullCallback
 );

public PullResult pullKernelImpl(
    // 从哪个消息队列获取数据
    final MessageQueue mq,
    // 过滤表达式
    final String subExpression,
    // 表达式类型
    final String expressionType,
    // 版本
    final long subVersion,
    // 消息拉取偏移量
    final long offset,
    // 本次拉取最大消息条数
    final int maxNums,
    // 拉取系统标记
    final int sysFlag,
    // 消费进度
    final long commitOffset,
    // 消息拉取过程中允许broker挂起的时间
    final long brokerSuspendMaxTimeMillis,
    // 消息拉取超时时间
    final long timeoutMillis,
    // 消息拉取模式
    final CommunicationMode communicationMode,
    // 从broker拉取到消息后的回调方法
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
  {

        // 选择broker
        FindBrokerResult findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                        this.recalculatePullFromWhichNode(mq), false);

        // 找不到说明broker宕机了,需要更新路由重新寻找
        if (null == findBrokerResult) {

            // 更新路由
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                            this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                        && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                            + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            // 类过滤
            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            // 异步拉取消息
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                    brokerAddr,
                    requestHeader,
                    timeoutMillis,
                    communicationMode,
                    pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

2.4.1. 如何选择broker

public FindBrokerResult findBrokerAddressInSubscribe(
        final String brokerName,
        final long brokerId,
        final boolean onlyThisBroker
    ) {
    String brokerAddr = null;
    boolean slave = false;
    boolean found = false;

    // 相同名字的broker构成集群,但他们的id不同
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
      // broker的addr
      brokerAddr = map.get(brokerId);
      // 判断是否是从机
      slave = brokerId != MixAll.MASTER_ID;
      // brokerAddr是否存在
      found = brokerAddr != null;

      // brokerAddr不存在且该broker是从机
      if (!found && slave) {
        // 寻找下一台从机
        brokerAddr = map.get(brokerId + 1);
        found = brokerAddr != null;
      }


      // 还是没找到且可以从其他broker集群找
      if (!found && !onlyThisBroker) {
        Entry<Long, String> entry = map.entrySet().iterator().next();
        brokerAddr = entry.getValue();
        slave = entry.getKey() != MixAll.MASTER_ID;
        found = true;
      }
    }

    if (found) {
      return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
    }

    return null;
}
  1. 当前broker是否可用,可用返回,不可用----》2.
  2. 判断是否是从机,是的话寻找下一台从机,不是的话且允许从其他broker集群中找另一台,则找另一台返回
  3. 如果这些都不满足,返回null

2.4.2. 过滤

跳过。。。。。

2.4.3. 异步拉取消息

public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {....}

// 异步拉取消息的case
case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);

2.4.4. 拉取消息完成后的回调

private void pullMessageAsync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis,
        final PullCallback pullCallback
    ) throws RemotingException, InterruptedException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            // 异步拉取数据后回调
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {
                        // 处理响应
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                        assert pullResult != null;
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        pullCallback.onException(e);
                    }
                } else {
                    if (!responseFuture.isSendRequestOK()) {
                        pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                    } else if (responseFuture.isTimeout()) {
                        pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                            responseFuture.getCause()));
                    } else {
                        pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                    }
                }
            }
        });
    }

当异步拉取消息完成后会触发operationComplete方法,从而触发pullCallback的onSuccess和onException方法

2.4.5. 如何处理拉取消息的响应报文

private PullResult processPullResponse(
        final RemotingCommand response,
        final String addr) throws MQBrokerException, RemotingCommandException {
        PullStatus pullStatus = PullStatus.NO_NEW_MSG;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS:
                pullStatus = PullStatus.FOUND;
                break;
            case ResponseCode.PULL_NOT_FOUND:
                pullStatus = PullStatus.NO_NEW_MSG;
                break;
            case ResponseCode.PULL_RETRY_IMMEDIATELY:
                pullStatus = PullStatus.NO_MATCHED_MSG;
                break;
            case ResponseCode.PULL_OFFSET_MOVED:
                pullStatus = PullStatus.OFFSET_ILLEGAL;
                break;

            default:
                throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
        }

        // 将响应解析成PullMessageResponseHeader类
        PullMessageResponseHeader responseHeader =
            (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);

  			// 注意!这里构建的是PullResultExt,但onSuccess接受的是PullResult
        return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
            responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
    }

decodeCommandCustomHeader中比价关键的一个方法是

// 根据扩展字段取值
String value = this.extFields.get(fieldName);

有兴趣可以看看

2.4.6. 拉取消息成功后的回调

// 成功拉取消息后的回调类
PullCallback pullCallback = new PullCallback() {
  @Override
  public void onSuccess(PullResult pullResult) {
    if (pullResult != null) {
      pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                                                   subscriptionData);

      switch (pullResult.getPullStatus()) {
        case FOUND:
          long prevRequestOffset = pullRequest.getNextOffset();
          pullRequest.setNextOffset(pullResult.getNextBeginOffset());
          long pullRT = System.currentTimeMillis() - beginTimestamp;
          DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                             pullRequest.getMessageQueue().getTopic(), pullRT);

          long firstMsgOffset = Long.MAX_VALUE;
          if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
          } else {
            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

            // 将获取到的消息放置到processQueue中
            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

            // 将获取的消息给consumeMessageService异步消费
            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
              pullResult.getMsgFoundList(),
              processQueue,
              pullRequest.getMessageQueue(),
              dispatchToConsume);

            // 如果拉取间隔大于0,则在若干时间后激活下次拉取
            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
              DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                     DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
            } else {
              DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
            }
          }

          if (pullResult.getNextBeginOffset() < prevRequestOffset
              || firstMsgOffset < prevRequestOffset) {
            log.warn(
              "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
              pullResult.getNextBeginOffset(),
              firstMsgOffset,
              prevRequestOffset);
          }

          break;
        case NO_NEW_MSG:
        case NO_MATCHED_MSG:

          // 校准消费进度
          pullRequest.setNextOffset(pullResult.getNextBeginOffset());
          DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

          // 马上进行一次拉取
          DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
          break;
        case OFFSET_ILLEGAL:
          log.warn("the pull request offset illegal, {} {}",
                   pullRequest.toString(), pullResult.toString());
          pullRequest.setNextOffset(pullResult.getNextBeginOffset());

          // 设置消息队列损坏
          pullRequest.getProcessQueue().setDropped(true);
          DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

            @Override
            public void run() {
              try {
                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                                        pullRequest.getNextOffset(), false);

                // 将消息进度持久化
                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                // 暂停该消息队列的消息拉取,等待下一次消息队列重新负载
                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                log.warn("fix the pull request offset, {}", pullRequest);
              } catch (Throwable e) {
                log.error("executeTaskLater Exception", e);
              }
            }
          }, 10000);
          break;
        default:
          break;
      }
    }
  }

  @Override
  public void onException(Throwable e) {
    if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
      log.warn("execute the pull request exception", e);
    }

    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  }
};
  1. onSuccess:将获取到的消息放置到processQueue中,再将processQueue给consumeMessageService异步消费,同时再定时启动下一次拉取任务
  2. onException和成功的异常情况都会触发offset的校准,这部分就不解释了,上面的注释感觉很完整了~

还记得刚才的那个问题嘛,什么时候会放置到pullRequestQueue中,答案就在这里,执行拉取消息任务的时候(马上和延迟两种模式)

public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

    public void executeTaskLater(final Runnable r, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }

2.4.7. 响应报文的参数转化

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
                                        final SubscriptionData subscriptionData) {
        // 这里的pullResult其实是PullResultExt类,所以强转成PullResultExt无损
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            // 填充msgList
            List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

            List<MessageExt> msgListFilterAgain = msgList;
            // 过滤消息
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            if (this.hasHook()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }

            for (MessageExt msg : msgListFilterAgain) {
                String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (Boolean.parseBoolean(traFlag)) {
                    msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
                }
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                        Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                        Long.toString(pullResult.getMaxOffset()));
                msg.setBrokerName(mq.getBrokerName());
            }

            // messageBinary -----> msgFoundList
            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }

整体的流程就是messageBinary -----> msgFoundList的转化

2.4.8. 流程图

如果看完上面的源码感觉还是很懵逼的话,可以看看下面这张流程图,等看完再看源码

image-20211010192616541

3. 思考

ConsumeGroup中的consumer是一个个线程还是一台台机器实例?

  1. 个人理解:应该是一个个线程,不然每个消费者做负载的时候还要负载到其他机器,不是耦合在一起了嘛,而且这样排错也会很困难,所以我更倾向于consumer是一个个线程,ConsumeGroup则是将某台机器上的消费者线程聚合在一起。
  2. 消费组管理同group的消费线程

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×