Xiaomi Galaxy Talos Book

Talos Spark Streaming


Characteristics

  • Easy-to-understand parallel processing: By using DirectTalosInputDStream, the RDD created by Spark Streaming contains one-to-one correspondence between the number of partitions and the number of Talos topic's partitions, which is easy to understand and use.
  • Exactly-once semantics: Offsets can be saved to reliable storage by checkpoint. Therefore, the semantics of Exactly-once can be guaranteed by turning on checkpoint (How to configure Checkpointing) and using the correct way to output to external storage ( Semantics of output operations) (checkpointing is enabled by default).

How to Use

Add dependency

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>${spark.version}</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>com.xiaomi.infra.galaxy</groupId>
  <artifactId>galaxy-talos-spark</artifactId>
  <version>${galaxy.version}</version>
</dependency>

Among them, spark.version needs to be consistent with the spark cluster, and galaxy.version generally selects the latest one. The spark-streaming scope setting is related to the cluster condition. If the cluster contains a streaming dependency, then it is set to provided, otherwise it is set to compile.

Common APIs

import org.apache.spark.streaming.talos._

val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume]){[dstream function]}

Parameter description:

  • spark conf] and [batch duration] are used to initialize the StreamingContext;
  • For [map of Talos parameters], the Talos cluster address (as value) must be specified through TalosClientConfigKeys.GALAXY_TALOS_SERVICE_ENDPOINT (as the key); by default, the generated DStream will start consumption from the latest (largest) offset of each Talos topic partition. If you want to start consumption from the oldest (smallest) offset, you need to set auto.offset.reset to smallest in [map of Talos parameters].
  • [Talos credential] and [set of topics to consume] are Talos-related concepts;
  • [dstream function] is a function parameter that requires the user pass in a function of type InputDStream[(String, String)] => Unit to construct a DStream; where (String, String) of InputDStream[(String, String)] corresponds to (Key, Message) of the Talos message; if you want to handle contents of other formats in the DStream, you can refer to the customized DStream data format in the high level usage below.

The StreamingContext created by the above method enables checkpointing by default, and checkpointing dir is the app name; In this way, when re-submitting the job, StreamingContext can be recovered from the checkpointing data, so that the consumption can be continued from the position where the last job exited (that is, the saved talos offset information); If you want to disable automatic checkpointing, you can configure SparkConf spark.streaming.talos.checkpointing.enable as false.

Sample code

Please check galaxy-sdk-java/galaxy-talos-client/galaxy-talos-spark/src/main/scala/com/xiaomi/infra/galaxy/talos/spark/example

High level usage

Customized DStream data format

TalosUtils provides the following APIs

import org.apache.spark.streaming.talos._

val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume], [custom message handler]){[dstream function]}

Among them, [custom message handler] is a parameter of the MessageAndOffset => T type, and T is a generic parameter. For example, if you want to add message offset information in the DStream data, you can customize the following message handler:

val messageHandler = (mo: MessageAndOffset) => (mo.message.partitionKey, mo.messageOffset, new String(mo.message.message.array(), Charset.forName("UTF-8")))

For message.array(), in addition to being converted to String, it can also be converted to other customized content, or directly return to an array.

Get offset range information for each batch

Using the sample code TalosSparkDemo as reference, you can obtain offset information in the following ways:

<br />  // Hold a reference to the current offset ranges, so it can be used downstream
  var offsetRanges = Array[OffsetRange]()

  val ssc = TalosUtils.createStreamingContext(
    sparkConf,
    Seconds(batchSec),
    Map(TalosClientConfigKeys.GALAXY_TALOS_SERVICE_ENDPOINT -&gt; uri,
      "auto.offset.reset" -&gt; "smallest"),
    credential,
    Set(topic)) { inputDstream =&gt; {
    inputDstream.transform { rdd =&gt;
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD(rdd =&gt; {
      // scalastyle:off println
      rdd.foreach(println)
      // scalastyle:on println
    })
  }

Pay attention to two points

  • In order to obtain the offset range information of the batch, the above transform must be invoked as the first method of inputDStream, instead of invoking it after other methods have been invoked.
  • If you have invoked any method that triggers a shuffle or repartition, such as reduceByKey(), the one-to-one correspondence between RDD partition and Talos partition will no longer be valid.

More configuration parameters

The following configuration can be set in sparkConf of streamingContext when invoking TalosUtils.createDirectStream. Setting in [map of Talos parameters] is invalid.

config default meaning
spark.streaming.talos.maxRatePerPartition The rate limit for each partition's data consumption, message/seconds. If the data volume of each batch is too large leading to a severe spark job delay, you may consider limiting the data volume of each batch.
spark.streaming.talos.maxRetries -1 When interacting with Talos, failed retries; when the network is not good, try to configure more retries. The default -1 is an infinite retry.
spark.streaming.talos.backoff.ms 200 Interval between failed retries when interacting with Talos.
spark.streaming.talos.checkpointing.enable true Whether to enable checkpointing automatically and set the directory to AppName; if the default is true, it will automatically execute streamingContext.checkpoint(AppName);

Feedback

If you have problems with usage, please contact wangjiasheng@xiaomi.com.