<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.xiaomi.infra.galaxy</groupId>
<artifactId>galaxy-talos-spark</artifactId>
<version>${galaxy.version}</version>
</dependency>
Among them, spark.version needs to be consistent with the spark cluster, and galaxy.version generally selects the latest one. The spark-streaming scope setting is related to the cluster condition. If the cluster contains a streaming dependency, then it is set to provided, otherwise it is set to compile.
import org.apache.spark.streaming.talos._
val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume]){[dstream function]}
Parameter description:
The StreamingContext created by the above method enables checkpointing by default, and checkpointing dir is the app name; In this way, when re-submitting the job, StreamingContext can be recovered from the checkpointing data, so that the consumption can be continued from the position where the last job exited (that is, the saved talos offset information); If you want to disable automatic checkpointing, you can configure SparkConf spark.streaming.talos.checkpointing.enable as false.
Please check galaxy-sdk-java/galaxy-talos-client/galaxy-talos-spark/src/main/scala/com/xiaomi/infra/galaxy/talos/spark/example
TalosUtils provides the following APIs
import org.apache.spark.streaming.talos._
val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume], [custom message handler]){[dstream function]}
Among them, [custom message handler] is a parameter of the MessageAndOffset => T type, and T is a generic parameter. For example, if you want to add message offset information in the DStream data, you can customize the following message handler:
val messageHandler = (mo: MessageAndOffset) => (mo.message.partitionKey, mo.messageOffset, new String(mo.message.message.array(), Charset.forName("UTF-8")))
For message.array(), in addition to being converted to String, it can also be converted to other customized content, or directly return to an array.
Using the sample code TalosSparkDemo as reference, you can obtain offset information in the following ways:
<br /> // Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
val ssc = TalosUtils.createStreamingContext(
sparkConf,
Seconds(batchSec),
Map(TalosClientConfigKeys.GALAXY_TALOS_SERVICE_ENDPOINT -> uri,
"auto.offset.reset" -> "smallest"),
credential,
Set(topic)) { inputDstream => {
inputDstream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD(rdd => {
// scalastyle:off println
rdd.foreach(println)
// scalastyle:on println
})
}
Pay attention to two points
The following configuration can be set in sparkConf of streamingContext when invoking TalosUtils.createDirectStream. Setting in [map of Talos parameters] is invalid.
config | default | meaning |
---|---|---|
spark.streaming.talos.maxRatePerPartition | The rate limit for each partition's data consumption, message/seconds. If the data volume of each batch is too large leading to a severe spark job delay, you may consider limiting the data volume of each batch. | |
spark.streaming.talos.maxRetries | -1 | When interacting with Talos, failed retries; when the network is not good, try to configure more retries. The default -1 is an infinite retry. |
spark.streaming.talos.backoff.ms | 200 | Interval between failed retries when interacting with Talos. |
spark.streaming.talos.checkpointing.enable | true | Whether to enable checkpointing automatically and set the directory to AppName; if the default is true, it will automatically execute streamingContext.checkpoint(AppName); |
If you have problems with usage, please contact wangjiasheng@xiaomi.com.