Xiaomi Galaxy Talos Book

TalosConsumer API & Conf


TalosConsumer API

As mentioned earlier, TalosConsumer is a high-level API for consumer data. It can automatically perform re-balance, commit, and other operations;

TalosConsumer does not expose the interface to the user. As long as there is instantiation, work is performed automatically. The processing logic after message pull is done through the registered callback interface MessageProcessor. The details are as follows;

After the message is consumed, a commit operation is required to record the consumption checkpoint. As for the checkpoint, Talos provides Best Practice for three scenarios as follows:

MessageProcessor interface

The MessageProcessor interface is used to implement the message consumption processing logic. The user needs to implement this interface. In TalosConsumer, each Partition corresponds to a thread to fetch data, and each Partition corresponds to a MessageProcessor instance. The MessageProcessor only processes the current Partition data pulled. As you can see, it provides 3 interfaces:

  public void init(TopicAndPartition topicAndPartition, long startMessageOffset);
  public void process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer);
  public void shutdown(MessageCheckpointer messageCheckpointer);

In general, users only need to implement the process interface. Other interface functions can be empty;

TalosConsumer can automatoically do the message's commit<code> operation, as well as <code>support the user to control the commit. Through the implementation of the <0>process interface, TalosConsumer provides the user with flexible usage scenarios, as follows:

  A. By default, the system itself commits according to certain strategies. For specific strategies, please refer to [Scenario 3] in the configuration instructions. In this case, users only need to implement their own message processing logic. They do not need to care about where the messages are consumed, and where checkpoints are;
  B. The user uses the `MessageCheckpoint.checkpoint()` interface to control the timing of the commit;
  C. The user uses the `MessageCheckpoint.checkpoint(long commitOffset)` interface to control the timing and specific offset of commit

The system default is the A scenario. If the user needs to use the latter two scenarios, the switch of checkpoint.auto.commit needs to be turned off. For details, see [Scenario 2] of the configuration description;

Best Practice for Checkpoint Management

The following describes the Best Practice of the above three scenarios

A. Auto Commit

auto-commit

As shown in the figure, Auto Commit is the default mode of the system. In this scenario, the user only needs to implement the message processing logic in the process function and does not need to care about when to commit. For the system's commit policy, see [Scenario 3] of the configuration description;

It can be seen that in this scenario, the system may have invoked the checkpoint to commit<code> a single time after invoking the <0>process interface multiple times, recording the current consumption offset position;

B. Using MessageCheckpoint.checkpoint()

In this scenario, the user can control when the checkpoint is performed. In general, the user wants to be able to commit to the current batch immediately after the process message is invoked or commit when a certain condition is established. In both cases, we recommend that the user invoke this interface in the process interface: MessageCheckpoint.checkpoint()

checkpoint

As shown in the figure, the first scenario is that the user performs checkpoint each time the process is invoked; the second scenario is that the user commits after some desired condition is established;

Please note:

  1. Don't forget to set galaxy.talos.consumer.checkpoint.auto.commit to false;

  2. When invoking checkpoint(), the offset of commit must be the Offset of the last Message on this list of messages in the <0>process(List&lt;MessageAndOffset&gt; messages, MessageCheckpointer messageCheckpointer) this time around;

C. Using MessageCheckpoint.checkpoint(long commitOffset)

Generally speaking, this scenario is such that the user is quite serious about the processing logic for a single message and wants to process one and then commit it;

checkpoint-offset

As shown in the figure, the user invokes commit after processing each Message. At this time, one needs to specify the current Offset of this message to commit;

Please note:

  1. Don't forget to set galaxy.talos.consumer.checkpoint.auto.commit to false;

  2. When a user invokes checkpoint(long commitOffset), the scope of this commitOffset must be (lastCommitOffset, messages.get(messages.size()-1).getMessageOffset()]

TalosConsumer's configuration instructions

Required configuration items
Name Description Default
galaxy.talos.service.endpoint Specifies the URI of Talos Server, can be configured with http and https, for the corresponding URI of the related cluster, see Cluster information --
Optional configuration item & scenario (for High Level TalosConsumer)
[Scenario 1] User wants to pull data faster/slower
  • galaxy.talos.consumer.fetch.interval.ms

    This configuration specifies the interval/frequency of pulling data from each partition in the consumer

    Default: 200ms

  • galaxy.talos.consumer.max.fetch.records

    This configuration specifies the maximum batch size (number of entries) for a single data pull for each partition in the Consumer

    Default: 1000

[Scenario 2] The user chooses whether to control the message's commit operation themselves
  • galaxy.talos.consumer.checkpoint.auto.commit

    This configuration specifies whether the system automatically commits consumption data. The default is true, indicating that the system automatically performs the checkpoint commit operation; if the user wants to commit themselves, the configuration is set to false;

    Default: true

[Scenario 3] When the user chooses to let the Consumer automatically commit, control the frequency of commit; when any of the following conditions is satisfied, one can immediately commit
  • galaxy.talos.consumer.commit.offset.record.fetched.num

    This configuration specifies that the consumer commits a one-time offset after a number of records consumed

    Default: 10000

  • galaxy.talos.consumer.commit.offset.interval.milli

    Configure the frequency of commit on the time dimension, the interval between the last commit and the next commit

    Default: 5000ms

[Scenario 4] MESSAGE_OFFSET_OUT_OF_RANGE appears when the message is pulled, the user's desired handling method
  • galaxy.talos.consumer.out.of.range.reset.latest.offset

    This configuration specifies whether to start pulling from this partition's latest offset when the offset goes out of bounds at the time the user pulls the message. The default is false. If an offset is out of bounds, the reading starts from the valid StartOffset of the current Partition. If it is set to true, the reading starts from the EndOffset of this partition.

    When will MESSAGE_OFFSET_OUT_OF_RANGE appear? For example, suppose the user message is saved for 1 day. The user uses TalosConsumer to consume some data and then stops the program. After 1 or several days, the user restarts TalosConsumer. The default is to read from LastCommitOffset (can be changed through Scenario 5), but because the data pointed to by LastCommitOffset has expired, TalosConsumer will then decide where to begin to consume data based on this configuration.

    Default: false

[Scenario 5] A user wants to reset to start reading the message Offset when they activate TalosConsumer
  • galaxy.talos.consumer.start.reset.offset.value

    This configuration has two values: -1, -2; they read messages from the StartOffset and the EndOffset of the current Partition respectively. The default value is StartOffset

    Default: -1

  • galaxy.talos.consumer.start.whether.reset.offset

    This configuration indicates whether to reset the offset when the TalosConsumer starts. The default value is false, i.e., it does not reset

    Default: false

    [Note] Please carefully understand these two configurations. The procedure is based on the following rule: When the user activates Consumer for the first time (that is, the LastCommitOffset does not exist), or 'galaxy.talos.consumer.start.whether.reset.offset' is true, the program starts reading offset based on the value of 'galaxy.talos.consumer.start.reset.offset.value', otherwise it reads from LastCommitOffset. This will produce the following scenarios:

    1) When Consumer is started for the first time by default, the program starts reading from StartOffset; if the user wants to start reading from EndOffset on the first activation, he only needs to configure 'galaxy.talos.consumer.start.reset.offset.value' as '-2'

    2) When the Consumer restarts by default (not the first start), the program starts reading from LastCommitOffset; if the user wants to start reading from StartOffset or EndOffset on restart, he needs to configure both of the above, set 'galaxy.talos.consumer.start.whether.reset.offset' to 'true' and another configuration to the corresponding value (-1/-2);