Mi Galaxy EMQ Book

死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。

为了将源Queue的未能成功处理的消息发送到死信队列,我们为源Queue添加了“重新驱动策略”的概念,详细的定义和API如下:

setQueueRedrivePolicy()


为(源)Queue 添加重新驱动策略。

方法参数 SetQueueRedrivePolicyRequest

queueName : String : required

源Queue的名称。

redrivePolicy : RedrivePolicy : required

重新驱动策略,每个Queue只能设置一个重新驱动策略。

- dlqName : String : required

死信队列的名称。

- maxReceiveTime : int : required

从源Queue中被最大接收的次数。取值范围为[1, 100]。 超过这个次数的消息会从源Queue中被发送到死信队列中。

注: 若重新驱动策略已设置好,但死信队列被删除,则该死信队列所有的源Queue的重新驱动策略无效。 源Queue和死信队列不可为Topic/Priority Queue,且源Queue不可为其他Queue的死信队列, 死信队列不可有重新驱动策略。 若要查看当前Queue的重新驱动策略,使用 getQueueInfo() API。

方法返回 SetQueueRedrivePolicyResponse

queueName : String : required

源Queue的名称。

redrivePolicy : RedrivePolicy : required

重新驱动策略。

removeQueueRedrivePolicy()


移除Queue的重新驱动策略。

方法参数 RemoveQueueRedrivePolicyRequest

queueName : String : required

源Queue的名称。

从死信队列中接收消息


使用 receiveMessage() API。接收到的message属性中额外包含以下内容:

  1. sourceQueueName:源Queue的名称
  2. sourceTag:消息在源Queue中被以该Tag接收后,达到重新驱动的条件死亡
  3. deadTimestamp:消息在源Queue中被宣布死亡(不会再从源Queue中被相应的Tag接收)的时间
  4. originalMessageID:消息原始的MessageID,即在源Queue中的MessageID
  5. originalReceiveCount:消息在原始的Queue中被receive过的次数

因为一个死信队列可能有多个源Queue,为了更方便用户区分源Queue的消息,我们为来自源Queue的消息定义了用户属性: 名称为“DLQ.sourceQueue”,类型是“STRING”,值为源Queue的名称。 用户可以用这个属性过滤接收死信队列中特定源Queue的消息。

注: DLQ.sourceQueue是系统保留的用户属性关键字,用户不可自定义这个名称的属性。

向死信队列发送消息


可以采用两种方式向死信队列“发送”消息,一种方式是同普通队列的发送方式,使用 sendMessage(), sendMessageBatch API。 这种方式发送的消息被接收时,不带有上述的属性。 另一种方式是从源Queue调用deadMessage(), deadMessageBatch API, 这个发送消息的过程可以理解为(源)对消息的另一种类型的ACK, 当用户检测出接收到的某些消息是无效的消息后,用户可以主动将这些消息发送到死信队列中以便后续分析。

多读者/Tag相关


重新驱动策略对所有的Tag都有效,但Tag之间是独立的。 当使用某个Tag在源Queue中接收的某些消息达到了重新驱动的条件时, 这些消息会被发送到死信队列中,不会被使用该Tag的receiver再次接收到, 而使用其他Tag的receiver还能接收到这些消息。