Apache Kafkaのv0.10.で追加されたKafka Streamsは名前の通りストリーミング処理アプリケーションがかけるライブラリです。今までKafkaのTopicのログを集計したり、1つのTopicにまとめるためにSparkやStormなどKafkaの外側でアプリケーションを動作させていたのが、Kafka Streamsを使うとKafkaの中でStream処理が記述できる様になります。

Topicに書き込まれたログに対して処理を施して別のTopicに書き出すといったことが簡単に記述できます。

以下で詳しく説明されているので読むとイメージが掴めるかもしれません。

confluentのQuickstartですぐに動かすことのできるアーカイブがダウンロードできてすぐに試せるので、すぐに試したい人はQuickstartで確認するといいかも。公式のドキュメントもQuickstartにStreamsが追加されている。

今回はkafka v0.10.0.1を使ってアプリケーションをpackageしてkafkaにロードして動作させることをやってみる。

Kafka StreamsのAPIはLow-Level Processor APIとHigh-Level Streams DSLがあるが今回は後者を使う。

サンプルとして、上記のExampleを参考にTopicに入ってきたデータをupcaseして別のTopicに書き出すだけのstreams-mapを動かしてみた。

環境

  • Java 8
  • mvn 3.3.9
  • Apacke kafka 0.10.0.1

ソースはココ

プロジェクトを作る

今回はmavenを使う。インストールは割愛。

適当なプロジェクトを作る。quickstartで良いかと思います。

$ mvn archetype:generate \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false \
-DgroupId=com.example \
-DartifactId=kafka-streams-sample

pom.xmlにkafka-streamsとkafka-clientsを記述。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>

fat-jarを作る必要があるらしいのでmaven-assembly-pluginかmaven-shade-pluginをpom.xmlに設定する。

サンプルソース

KStreamはデータ+処理の流れ、KTableはデータ+状態を持つためのトピックでKStreamからKTableを操作することができるし、KTableをKStreamに変換することもできるというイメージ。KTableにはJoinなどTable特有の操作が実装されている。SparkでいうとRDDにメソッドチェーンで処理を記述していくようなことがKStreamに対してもできる。

public class App2 {
public static void main(String[] args) throws Exception {
//接続情報など
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-map");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// serializersとdeserializersの設定
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

KStreamBuilder builder = new KStreamBuilder();
// serializersとdeserializersを設定してTopicを指定する
KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "TextLinesTopic");
// textLinesをupper caseする
KStream<byte[], String> uppercasedWithMapValues = textLines.mapValues(String::toUpperCase);
// toメソッドでTopicに書き出す
uppercasedWithMapValues.to("UppercasedTextLinesTopic");
KStream<byte[], String> uppercasedWithMap = textLines.map((key, value) -> new KeyValue<>(key, value.toUpperCase()));
KStream<String, String> originalAndUppercased = textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase()));
originalAndUppercased.to(stringSerde, stringSerde, "OriginalAndUppercasedTopic");

KafkaStreams streams = new KafkaStreams(builder, props);

streams.start();
Thread.sleep(500000L);
streams.close();
}
}

パッケージ

$ mvn clean package
$ cp target/kafka-streams-sample-1.0-SNAPSHOT.jar ${kafka_root}/libs

実行

同一のapplication idtopicを使いまわして実行すると動作しないので、実行毎にトピックを削除する必要がある。

Kafka起動

$ cd ${kafka_root}
$ nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
$ nohup ./bin/kafka-server-start.sh config/server.properties &

トピックを作る

$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic TextLinesTopic
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic UppercasedTextLinesTopic
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic OriginalAndUppercasedTopic

データを書き込む

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt
cat /tmp/file-input.txt | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TextLinesTopic

Kafka Streams実行

PackageしたJarを以下のコマンドでKafkaにロードしてStreams処理が実行できます。

$ ./bin/kafka-run-class.sh com.example.App2

kafka-run-class.shの中を確認するとlibs以下がCLASSPATHに追加されているので基本的にlibs以下に作ったjarを置けば問題なさそうである。

(...)
# classpath addition for release
for file in "$base_dir"/libs/*;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
(...)

そんな設定しらん!とWARNが出るが動くので今は無視。StreamsConfigがConsumerConfigを参照しているとかで出てるっぽい(?)

[2016-09-11 08:01:46,493] WARN The configuration replication.factor = 1 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-09-11 08:01:46,494] WARN The configuration num.standby.replicas = 0 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-09-11 08:01:46,494] WARN The configuration zookeeper.connect = localhost:2181 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-09-11 08:01:46,494] WARN The configuration __stream.thread.instance__ = Thread[StreamThread-1,5,main] was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)

確認

$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic OriginalAndUppercasedTopic --from-beginning
ALL STREAMS LEAD TO KAFKA
HELLO KAFKA STREAMS
JOIN KAFKA SUMMIT
ALL STREAMS LEAD TO KAFKA
HELLO KAFKA STREAMS
JOIN KAFKA SUMMIT

upcaseされるた文字列がTopicに書き込まれていることが確認できた。

まとめ

Kafka上でストリーミング処理ができるようになったことで、データ加工や集計などはKafkaだけで完結できるのですごくいい感じ。

おわり。

参考

  1. http://kafka.apache.org/documentation.html#streams
  2. http://docs.confluent.io/3.0.1/streams/index.html
  3. http://developers.linecorp.com/blog/ja/?p=3718
  4. http://qiita.com/_____/items/60f92a1db6dfce4303c5