Mi Galaxy EMQ Book

TopicQueue is an API developed to guarantee the local ordering of Messages in a Queue (ordered in the same Topic).

When a user creates a Queue, the Queue's topicQueue attribute is set to True, and the resulting Queue is a TopicQueue.
After sending the Message to the TopicQueue, the user can set the String type Topic parameter for the Message (or if not specified, then the Topic is empty). EMQ will ensure that when a user receives a message, the same Topic's Message is delivered with the MessageId's order strictly. Also, if the user has not executed deleteMessage() for a Message with a small MessageId, a large MessageId Message with the same Topic will not be received by any Client.

Note
The rank of the MessageId is based on sendTime+delayTime.
An empty Topic is not the same as any Topic. That is, Messages with an empty Topic are not "ordered" and behave like nonTopic Messages.

Messages in TopicQueue also have a "time-out" feature to ensure Elastic Message Queue's "at-least-once" deliver semantics; After re-sending has occurred, it may cause some two concurrent (threads) of the client to receive the same message, thereby disrupting the order of the messages. Detailed examples are as follows:

For the same Topic Message, the order of the messageIds is m1, m2, and m3.
The user started two threads to process Message, t1 and t2; the timeout was set to 30 seconds.
T1 receives m1 for processing. During processing, m2, m3 are locked by EMQ and cannot be received by any other thread.
T1 finishes processing m1 soon and uses`deleteMessage()` for ACK. After ACK, messages are unlocked, m2 is received by t2 then m3 is locked.
After t2 receives m2, it has not yet begun processing, and it is suspended by the operating system (eg Java GC). Suspend more than 30 seconds.
After timeout, m2 becomes visible again, and is processed by t1. After t1 ack m2, m3 is unlocked and is received and processed by t1
After t2 hangs up, it does not know that m2 has been processed by t1 and continues to process m2
In the above process, due to the feature of “time-out re-sending”, and from the perspective of the EMQ Server the receive order of the messages is m1, m2, m2, m3.
However, from the perspective of the user's overall processing, the processing order is m1, m2, m3, m2.
In this case, the order of the messages was destroyed.

It should be pointed out that similar problems may exist in the message system with the auto-recovery function.
If strict ordering is required, users need to cooperate on the client side.
For example, the client keeps the messageId of the last message processed, and then simply discards the message with a larger messageId received. In this way it not only ensures the strict ordering of the messages, but also implements the semantics of at-most once, that is, guarantees that message processing is not lost.

The implementation of TopicQueue is through the HASH algorithm to put the same Topic Message in the same partition in the queue, so there may be some of the following performance loss. If there is no strict order requirement for the Message in a queue, try to create it as a nonTopicQueue:

  1. The number of partitions is not adjustable
    For nonTopicQueue, users can increase the number of partitions when the message flow increases to improve the concurrency of the queue. However, in the current implementation (which may be improved later), once the number of Partitions of the TopicQueue is confirmed, it cannot be adjusted again. Users need to estimate the message flow during createQueue() and set the appropriate number of partitions. For messages with a size of about 1 KB, you can think of a partition as having a throughput of 500 messages/sec.
  2. Impact of different topic messages
    One Partition contains one or more Topics;
    If a Topic Message has not been successfully processed for a long time, it may affect all messages in the same partition.
  3. Messages are unevenly distributed between partitions
    Messages that specify topic can only be sent to a specific partition in the queue. If the underlying structure corresponding to this partition is temporarily unavailable, it will cause sendMessage() to fail;
    Messages that do not specify topic (messages that include nonTopicQueue and TopicQueue that do not specify topic) will randomly enter a partition, and if this partition is not available, select another partition to retry.