RocketMQ

关于 RocketMQ pubsub 组件的详细文档

组件格式

要设置 RocketMQ pub/sub,创建一个类型为 pubsub.rocketmq 的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何:发布和订阅指南 了解如何创建和应用 pub/sub 配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rocketmq-pubsub
spec:
  type: pubsub.rocketmq
  version: v1
  metadata:
    - name: instanceName
      value: dapr-rocketmq-test
    - name: consumerGroup
      value: dapr-rocketmq-test-g-c
    - name: producerGroup 
      value: dapr-rocketmq-test-g-p
    - name: consumerID
      value: channel1
    - name: nameSpace
      value: dapr-test
    - name: nameServer
      value: "127.0.0.1:9876,127.0.0.2:9876"
    - name: retries
      value: 3
    - name: consumerModel
      value: "clustering"
    - name: consumeOrderly
      value: false

规格元数据字段

字段必需详情默认值示例
instanceNameN实例名称time.Now().String()dapr-rocketmq-test
consumerGroupN消费者组名称。建议使用。如果 producerGroupnull,则使用 groupNamedapr-rocketmq-test-g-c
producerGroup (consumerID)N生产者组名称。建议使用。如果 producerGroupnull,则使用 consumerID。如果 consumerID 也为 null,则使用 groupNamedapr-rocketmq-test-g-p
consumerIDN消费者 ID(消费者标签)用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,一条消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看可以在组件元数据中使用的所有模板标签。
groupNameN消费者/生产者组名称。已弃用dapr-rocketmq-test-g
nameSpaceNRocketMQ 命名空间dapr-rocketmq
nameServerDomainNRocketMQ 名称服务器域名https://my-app.net:8080/nsaddr
nameServerNRocketMQ 名称服务器,使用 “,” 或 “;” 分隔127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877
accessKeyN访问密钥(用户名)"admin"
secretKeyN密钥(密码)"password"
securityTokenN安全令牌
retriesN向 broker 发送消息的重试次数33
producerQueueSelector (queueSelector)N生产者队列选择器。有五种队列选择器实现:hashrandommanualroundRobindaprdaprhash
consumerModelN定义消息如何传递到每个消费者客户端的消息模型。RocketMQ 支持两种消息模型:clusteringbroadcastingclusteringbroadcasting , clustering
fromWhere (consumeFromWhere)N消费者启动时的消费点。有三个消费点:CONSUME_FROM_LAST_OFFSETCONSUME_FROM_FIRST_OFFSETCONSUME_FROM_TIMESTAMPCONSUME_FROM_LAST_OFFSETCONSUME_FROM_LAST_OFFSET
consumeTimestampN以秒为精度回溯消费时间。时间格式为 yyyymmddhhmmss。例如,20131223171201 表示 2013 年 12 月 23 日 17:12:01time.Now().Add(time.Minute * (-30)).Format("20060102150405")20131223171201
consumeOrderlyN确定是否使用 FIFO 顺序的有序消息。falsefalse
consumeMessageBatchMaxSizeN批量消费大小,范围 [1, 1024]51210
consumeConcurrentlyMaxSpanN并发最大跨度偏移。这对顺序消费没有影响。范围:[1, 65535]10001000
maxReconsumeTimesN最大重新消费次数。-1 表示 16 次。如果消息在成功前被重新消费超过 {@link maxReconsumeTimes} 次,它们将被定向到删除队列。顺序消息为 MaxInt32;并发消息为 1616
autoCommitN启用自动提交truefalse
consumeTimeoutN消息可能阻塞消费线程的最大时间。时间单位:分钟1515
consumerPullTimeoutN套接字超时时间,单位为毫秒
pullIntervalN消息拉取间隔100100
pullBatchSizeN一次从 broker 拉取的消息数量。如果 pullBatchSizenull,使用 ConsumerBatchSizepullBatchSize 范围 [1, 1024]3210
pullThresholdForQueueN队列级别的流量控制阈值。默认情况下,每个消息队列将缓存最多 1000 条消息。考虑 PullBatchSize - 瞬时值可能超过限制。范围:[1, 65535]10241000
pullThresholdForTopicN主题级别的流量控制阈值。如果 pullThresholdForQueue 不是无限制的,将被 pullThresholdForTopic 的值覆盖并计算。例如,如果 pullThresholdForTopic 的值为 1000,并且为此消费者分配了 10 个消息队列,则 pullThresholdForQueue 将设置为 100。范围:[1, 6553500]-1(无限制)10
pullThresholdSizeForQueueN限制队列级别的缓存消息大小。考虑 pullBatchSize - 瞬时值可能超过限制。消息的大小仅通过消息体测量,因此不准确。范围:[1, 1024]100100
pullThresholdSizeForTopicN限制主题级别的缓存消息大小。如果 pullThresholdSizeForQueue 不是无限制的,将被 pullThresholdSizeForTopic 的值覆盖并计算。例如,如果 pullThresholdSizeForTopic 的值为 1000 MiB,并且为此消费者分配了 10 个消息队列,则 pullThresholdSizeForQueue 将设置为 100 MiB。范围:[1, 102400]-1100
content-typeN消息内容类型。"text/plain""application/cloudevents+json; charset=utf-8", "application/octet-stream"
logLevelN日志级别warninfo
sendTimeOutN连接 RocketMQ 的 broker 发送消息超时,以纳秒为单位。已弃用3 秒10000000000
sendTimeOutSecN发布消息的超时时间,以秒为单位。如果 sendTimeOutSecnull,则使用 sendTimeOut3 秒3
mspPropertiesNRocketMQ 消息属性集合中的属性传递给应用程序,数据用 “,” 分隔多个属性key,mkey

出于向后兼容的原因,元数据中的以下值仍然支持,尽管不推荐使用。

字段(支持但已弃用)必需详情示例
groupNameNRocketMQ 发布者的生产者组名称"my_unique_group_name"
sendTimeOutN发布消息的超时时间,以纳秒为单位0
consumerBatchSizeN一次从 broker 拉取的消息数量32

设置 RocketMQ

请参阅 https://rocketmq.apache.org/docs/quick-start/ 以设置本地 RocketMQ 实例。

每次调用的元数据字段

分区键

在调用 RocketMQ pub/sub 时,可以通过在请求 URL 中使用 metadata 查询参数提供可选的分区键。

您需要在 metadata 中指定 rocketmq-tag"rocketmq-key"rocketmq-shardingkeyrocketmq-queue

示例:

curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

队列选择器

RocketMQ 组件提供了五种队列选择器。RocketMQ 客户端提供以下队列选择器:

  • HashQueueSelector
  • RandomQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

要了解有关这些 RocketMQ 客户端队列选择器的更多信息,请阅读 RocketMQ 文档

Dapr RocketMQ 组件实现了以下队列选择器:

  • DaprQueueSelector

本文重点介绍 DaprQueueSelector 的设计。

DaprQueueSelector

DaprQueueSelector 集成了三个队列选择器:

  • HashQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

DaprQueueSelector 从请求参数中获取队列 ID。您可以通过运行以下命令设置队列 ID:

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1

ManualQueueSelector 是通过上述方法实现的。

接下来,DaprQueueSelector 尝试:

  • 获取 ShardingKey
  • 哈希 ShardingKey 以确定队列 ID。

您可以通过以下方式设置 ShardingKey

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key

如果 ShardingKey 不存在,则使用 RoundRobin 算法确定队列 ID。

相关链接