<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.xiaomi.infra.galaxy</groupId>
<artifactId>galaxy-talos-spark</artifactId>
<version>${galaxy.version}</version>
</dependency>
其中,spark.version需要和spark集群保持一致,galaxy.version一般选择最新即可; spark-streaming的scope设置与集群情况相关, 如果集群上包含了streaming依赖, 则设置为provided即可; 否则设置为compile.
import org.apache.spark.streaming.talos._
val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume]){[dstream function]}
参数说明:
通过上述方式创建的StreamingContext, 默认是开启checkpointing的, 且checkpointing dir是app name; 这样当重新提交作业的时候, StreamingContext可以从checkpointing数据恢复, 从而可以从上次作业退出的位置(即保存的talos offset信息)继续消费; 如果想要禁止自动开启checkpointing, 可以配置SparkConf spark.streaming.talos.checkpointing.enable 为false.
请查看galaxy-sdk-java/galaxy-talos-client/galaxy-talos-spark/src/main/scala/com/xiaomi/infra/galaxy/talos/spark/example
TalosUtils提供如下API
import org.apache.spark.streaming.talos._
val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume], [custom message handler]){[dstream function]}
其中,[custom message handler]是类型为MessageAndOffset => T的参数,T为泛型参数。 比如,想要在DStream中的数据里添加message offset信息的话,可以自定义如下message handler:
val messageHandler = (mo: MessageAndOffset) => (mo.message.partitionKey, mo.messageOffset, new String(mo.message.message.array(), Charset.forName("UTF-8")))
对于message.array(), 除了转化为String, 也可以转化为其他自定义的内容, 或者直接返回array.
以示例代码TalosSparkDemo为参照, 通过一下方式可以获取offset 信息:
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
val ssc = TalosUtils.createStreamingContext(
sparkConf,
Seconds(batchSec),
Map(TalosClientConfigKeys.GALAXY_TALOS_SERVICE_ENDPOINT -> uri,
"auto.offset.reset" -> "smallest"),
credential,
Set(topic)) { inputDstream => {
inputDstream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD(rdd => {
// scalastyle:off println
rdd.foreach(println)
// scalastyle:on println
})
}
注意两点
以下配置都可以在调用TalosUtils.createDirectStream时,设置在中streamingContext的sparkConf中的,设置在[map of Talos parameters]中是无效的。
config | default | meaning |
---|---|---|
spark.streaming.talos.maxRatePerPartition | 0 | 每个partition消费数据的速率限制,message/seconds;如果每个batch数据量太大,导致spark job delay严重时,可以考虑限制每个batch的数据量。 |
spark.streaming.talos.maxRetries | -1 | 与Talos交互时,失败重试次数;网络不好时,可尝试配置多一些重试次数。默认-1为无限重试。 |
spark.streaming.talos.backoff.ms | 200 | 与Talos交互时,失败重试次数之间的时间间隔。 |
spark.streaming.talos.checkpointing.enable | true | 是否自动开启checkpointing,并设置目录为AppName;默认为true,会自动执行streamingContext.checkpoint(AppName); |
有使用问题,请联系 wangjiasheng@xiaomi.com 。