As mentioned earlier, TalosConsumer is a high-level API for consumer data. It can automatically perform re-balance, commit, and other operations;
TalosConsumer does not expose the interface to the user. As long as there is instantiation, work is performed automatically. The processing logic after message pull is done through the registered callback interface MessageProcessor. The details are as follows;
After the message is consumed, a commit operation is required to record the consumption checkpoint. As for the checkpoint, Talos provides Best Practice for three scenarios as follows:
The MessageProcessor interface is used to implement the message consumption processing logic. The user needs to implement this interface. In TalosConsumer, each Partition corresponds to a thread to fetch data, and each Partition corresponds to a MessageProcessor instance. The MessageProcessor only processes the current Partition data pulled. As you can see, it provides 3 interfaces:
public void init(TopicAndPartition topicAndPartition, long startMessageOffset);
public void process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer);
public void shutdown(MessageCheckpointer messageCheckpointer);
In general, users only need to implement the process
interface. Other interface functions can be empty;
TalosConsumer can automatoically do the message's commit<code> operation, as well as <code>support the user to control the commit
. Through the implementation of the <0>process interface, TalosConsumer provides the user with flexible usage scenarios, as follows:0>
A. By default, the system itself commits according to certain strategies. For specific strategies, please refer to [Scenario 3] in the configuration instructions. In this case, users only need to implement their own message processing logic. They do not need to care about where the messages are consumed, and where checkpoints are;
B. The user uses the `MessageCheckpoint.checkpoint()` interface to control the timing of the commit;
C. The user uses the `MessageCheckpoint.checkpoint(long commitOffset)` interface to control the timing and specific offset of commit
The system default is the A scenario. If the user needs to use the latter two scenarios, the switch of checkpoint.auto.commit
needs to be turned off. For details, see [Scenario 2] of the configuration description
;
The following describes the Best Practice of the above three scenarios
As shown in the figure, Auto Commit is the default mode of the system. In this scenario, the user only needs to implement the message processing logic in the process function and does not need to care about when to commit. For the system's commit policy, see [Scenario 3] of the configuration description;
It can be seen that in this scenario, the system may have invoked the checkpoint to commit<code> a single time after invoking the <0>process
interface multiple times, recording the current consumption offset position;
In this scenario, the user can control when the checkpoint is performed. In general, the user wants to be able to commit to the current batch immediately after the process message is invoked or commit when a certain condition is established. In both cases, we recommend that the user invoke this interface in the process interface: MessageCheckpoint.checkpoint()
As shown in the figure, the first scenario is that the user performs checkpoint each time the process is invoked; the second scenario is that the user commits after some desired condition is established;
Please note:
Don't forget to set galaxy.talos.consumer.checkpoint.auto.commit
to false;
When invoking checkpoint()
, the offset of commit must be the Offset of the last Message on this list of messages in the <0>process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer)
this time around;
Generally speaking, this scenario is such that the user is quite serious about the processing logic for a single message and wants to process one and then commit it;
As shown in the figure, the user invokes commit after processing each Message. At this time, one needs to specify the current Offset of this message to commit;
Please note:
Don't forget to set galaxy.talos.consumer.checkpoint.auto.commit
to false;
When a user invokes checkpoint(long commitOffset)
, the scope of this commitOffset
must be (lastCommitOffset, messages.get(messages.size()-1).getMessageOffset()]
;
Name | Description | Default |
---|---|---|
galaxy.talos.service.endpoint | Specifies the URI of Talos Server, can be configured with http and https, for the corresponding URI of the related cluster, see Cluster information | -- |
This configuration specifies the interval/frequency of pulling data from each partition in the consumer
Default: 200ms
This configuration specifies the maximum batch size (number of entries) for a single data pull for each partition in the Consumer
Default: 1000
This configuration specifies whether the system automatically commits consumption data. The default is true, indicating that the system automatically performs the checkpoint commit operation; if the user wants to commit themselves, the configuration is set to false;
Default: true
This configuration specifies that the consumer commits a one-time offset after a number of records consumed
Default: 10000
Configure the frequency of commit on the time dimension, the interval between the last commit and the next commit
Default: 5000ms
This configuration specifies whether to start pulling from this partition's latest offset when the offset goes out of bounds at the time the user pulls the message. The default is false. If an offset is out of bounds, the reading starts from the valid StartOffset of the current Partition. If it is set to true, the reading starts from the EndOffset of this partition.
When will MESSAGE_OFFSET_OUT_OF_RANGE appear? For example, suppose the user message is saved for 1 day. The user uses TalosConsumer to consume some data and then stops the program. After 1 or several days, the user restarts TalosConsumer. The default is to read from LastCommitOffset (can be changed through Scenario 5), but because the data pointed to by LastCommitOffset has expired, TalosConsumer will then decide where to begin to consume data based on this configuration.
Default: false
This configuration has two values: -1, -2; they read messages from the StartOffset and the EndOffset of the current Partition respectively. The default value is StartOffset
Default: -1
This configuration indicates whether to reset the offset when the TalosConsumer starts. The default value is false, i.e., it does not reset
Default: false
[Note] Please carefully understand these two configurations. The procedure is based on the following rule: When the user activates Consumer for the first time (that is, the LastCommitOffset does not exist), or 'galaxy.talos.consumer.start.whether.reset.offset' is true, the program starts reading offset based on the value of 'galaxy.talos.consumer.start.reset.offset.value', otherwise it reads from LastCommitOffset. This will produce the following scenarios:
1) When Consumer is started for the first time by default, the program starts reading from StartOffset; if the user wants to start reading from EndOffset on the first activation, he only needs to configure 'galaxy.talos.consumer.start.reset.offset.value' as '-2'
2) When the Consumer restarts by default (not the first start), the program starts reading from LastCommitOffset; if the user wants to start reading from StartOffset or EndOffset on restart, he needs to configure both of the above, set 'galaxy.talos.consumer.start.whether.reset.offset' to 'true' and another configuration to the corresponding value (-1/-2);