<properties>
<talos.version>2.0.1</talos.version>
<storm.version>0.9.1-incubating-mdh1.0-SNAPSHOT</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>com.xiaomi.infra.galaxy</groupId>
<artifactId>galaxy-talos-storm</artifactId>
<version>${talos.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
storm.version and talos.version are set according to the actual situation.
The constructor of TalosStormConfig is very simple:
public TalosStormConfig(String topic, String consumerGroupName, Credential credential, String talosEndpoint)
Among them, topic is the name of talos topic that needs to read data; consumerGroupName is mainly related to saving and reading offset information; for the usage of Credential, please refer to authentication and authorization model; talosEndpoint is the Talos cluster address, that is, the galaxy.talos.service.endpoint that TalosConsumer needs to configure during the initialization process.
Some customizable parameters can be set in TalosStormConfig.parameters. For example:
talosStormConfig.parameters.put(TalosStormConfigKeys.COMMIT_INTERVAL_MS, "6000");
The configurable parameters can be found in TalosStormConfigKeys. The specific meanings and default configurations are as follows:
| Configuration | Description | Default Value | |:-:|:-:|:-:| |PARTITION_QUEUE_SIZE| Cache queue size for messages read from Talos and not emitted | 1000 | | COMMIT_INTERVAL_MS |commit offset interval | 60000 | | MAX_RETRIES | Failed retries for initializing TalosConsumer | 2 | |SPOUT_IDLE_MS| When no message can be emitted, spout will sleep this configured time and try again |50| If there are other TalosConsumer-related custom parameters, they can also be configured in TalosStormConfig.parameters, for example:
talosStormConfig.parameters.put(TalosClientConfigKeys.GALAXY_TALOS_CONSUMER_FETCH_INTERVAL, 200)
Scheme controls the content of the message emitted by TalosStormSpout. The default Scheme of TalosStormConfig is DefaultTalosStormScheme. The generated tuple has a total of four fields, in the following order: toipic, offset, topic, partition The Bolt in the downstream of TalosStormSpout gets the data in the tuple as follows:
String msg = tuple.getStringByField(DefaultTalosStormScheme.MESSAGE_STRING_SCHEME_KEY);
long offset = tuple.getLongByField(DefaultTalosStormScheme.OFFSET_LONG_SCHEME_KEY);
String topic = tuple.getStringByField(DefaultTalosStormScheme.TOPIC_STRING_SCHEME_KEY);
int partition = tuple.getIntegerByField(DefaultTalosStormScheme.PARTITION_INT_SCHEME_KEY);
If you need to customize Scheme, implement the TalosStormScheme interface and configure it in TalosStormConfig. Main interface implementation example:
@Override
public Iterable<List<Object>> generateTuples(TopicAndPartition topicPartition, MessageAndOffset msg) {
List<Object> tuple = new ArrayList<Object>();
tuple.add(new String(msg.getMessage().getMessage(), Charsets.UTF_8));
tuple.add(msg.messageOffset);
tuple.add(topicPartition.topicName);
tuple.add(topicPartition.partitionId);
return Arrays.asList(tuple);
}
@Override
public Fields getOutputFields() {
return new Fields("message", "offset", "topic", "partition");
}
TalosStormSpout has only one constructor and it can be passed to the already configured TalosStormConfig. For the construction and submission of Topology, please refer to the storm official wiki.
private final String topic = "talos-storm-topic";
private final String group = "talos-storm-group";
private final String keyId = "";
private final String key = "";
private final String talosEndpoint = "";
public StormTopology buildTopology() {
Credential credential = new Credential()
.setSecretKeyId(keyId)
.setSecretKey(key)
.setType(UserType.DEV_XIAOMI);
TalosStormConfig config = new TalosStormConfig(topic,
group, credential, talosEndpoint);
config.parameters.put(TalosStormConfigKeys.COMMIT_INTERVAL_MS, "60000");
config.parameters.put(TalosClientConfigKeys.GALAXY_TALOS_CONSUMER_FETCH_INTERVAL, "200");
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("talos-reader", new TalosStormSpout(config), 4);
builder.setBolt("word-count", new WordCountBolt(), 1)
.shuffleGrouping("talos-reader");
return builder.createTopology();
}
If you have feedback on usage problems and BUG, please contact wangjiasheng@xiaomi.com.