TopicQueue is an API developed to guarantee the local ordering of Messages in a Queue (ordered in the same Topic).
When a user creates a Queue, the Queue's topicQueue attribute is set to True, and the resulting Queue is a TopicQueue.
After sending the Message to the TopicQueue, the user can set the String type Topic parameter for the Message (or if not specified, then the Topic is empty). EMQ will ensure that when a user receives a message, the same Topic's Message is delivered with the MessageId's order strictly. Also, if the user has not executed deleteMessage()
for a Message with a small MessageId, a large MessageId Message with the same Topic will not be received by any Client.
Note
The rank of the MessageId is based on sendTime+delayTime.
An empty Topic is not the same as any Topic. That is, Messages with an empty Topic are not "ordered" and behave like nonTopic Messages.
Messages in TopicQueue also have a "time-out" feature to ensure Elastic Message Queue's "at-least-once" deliver semantics; After re-sending has occurred, it may cause some two concurrent (threads) of the client to receive the same message, thereby disrupting the order of the messages. Detailed examples are as follows:
For the same Topic Message, the order of the messageIds is m1, m2, and m3.
The user started two threads to process Message, t1 and t2; the timeout was set to 30 seconds.
T1 receives m1 for processing. During processing, m2, m3 are locked by EMQ and cannot be received by any other thread.
T1 finishes processing m1 soon and uses`deleteMessage()` for ACK. After ACK, messages are unlocked, m2 is received by t2 then m3 is locked.
After t2 receives m2, it has not yet begun processing, and it is suspended by the operating system (eg Java GC). Suspend more than 30 seconds.
After timeout, m2 becomes visible again, and is processed by t1. After t1 ack m2, m3 is unlocked and is received and processed by t1
After t2 hangs up, it does not know that m2 has been processed by t1 and continues to process m2
In the above process, due to the feature of “time-out re-sending”, and from the perspective of the EMQ Server the receive order of the messages is m1, m2, m2, m3.
However, from the perspective of the user's overall processing, the processing order is m1, m2, m3, m2.
In this case, the order of the messages was destroyed.
It should be pointed out that similar problems may exist in the message system with the auto-recovery function.
If strict ordering is required, users need to cooperate on the client side.
For example, the client keeps the messageId of the last message processed, and then simply discards the message with a larger messageId received. In this way it not only ensures the strict ordering of the messages, but also implements the semantics of at-most once, that is, guarantees that message processing is not lost.
The implementation of TopicQueue is through the HASH algorithm to put the same Topic Message in the same partition in the queue, so there may be some of the following performance loss. If there is no strict order requirement for the Message in a queue, try to create it as a nonTopicQueue:
createQueue()
and set the appropriate number of partitions. For messages with a size of about 1 KB, you can think of a partition as having a throughput of 500 messages/sec. sendMessage()
to fail;