Apache Kafkaのv0.10.で追加されたKafka Streamsは名前の通りストリーミング処理アプリケーションがかけるライブラリです。今までKafkaのTopicのログを集計したり、1つのTopicにまとめるためにSparkやStormなどKafkaの外側でアプリケーションを動作させていたのが、Kafka Streamsを使うとKafkaの中でStream処理が記述できる様になります。
Topicに書き込まれたログに対して処理を施して別のTopicに書き出すといったことが簡単に記述できます。
以下で詳しく説明されているので読むとイメージが掴めるかもしれません。
confluentのQuickstart ですぐに動かすことのできるアーカイブがダウンロードできてすぐに試せるので、すぐに試したい人はQuickstart で確認するといいかも。公式のドキュメントもQuickstart にStreamsが追加されている。
今回はkafka v0.10.0.1 を使ってアプリケーションをpackageしてkafkaにロードして動作させることをやってみる。
Kafka StreamsのAPIはLow-Level Processor APIとHigh-Level Streams DSLがあるが今回は後者を使う。
サンプルとして、上記のExampleを参考にTopicに入ってきたデータをupcaseして別のTopicに書き出すだけのstreams-map
を動かしてみた。
環境
Java 8
mvn 3.3.9
Apacke kafka 0.10.0.1
ソースはココ
プロジェクトを作る 今回はmavenを使う。インストールは割愛。
適当なプロジェクトを作る。quickstartで良いかと思います。
$ mvn archetype:generate \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false \ -DgroupId=com.example \ -DartifactId=kafka-streams-sample
pom.xmlにkafka-streamsとkafka-clientsを記述。
<dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-streams</artifactId > <version > 0.10.0.1</version > </dependency > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 0.10.0.1</version > </dependency >
fat-jarを作る必要があるらしいのでmaven-assembly-pluginかmaven-shade-pluginをpom.xmlに設定する。
サンプルソース KStreamはデータ+処理の流れ、KTableはデータ+状態を持つためのトピックでKStreamからKTableを操作することができるし、KTableをKStreamに変換することもできるというイメージ。KTableにはJoinなどTable特有の操作が実装されている。SparkでいうとRDDにメソッドチェーンで処理を記述していくようなことがKStreamに対してもできる。
public class App2 { public static void main (String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-map" ); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); final Serde<String> stringSerde = Serdes.String(); final Serde<byte []> byteArraySerde = Serdes.ByteArray(); KStreamBuilder builder = new KStreamBuilder(); KStream<byte [], String> textLines = builder.stream(byteArraySerde, stringSerde, "TextLinesTopic" ); KStream<byte [], String> uppercasedWithMapValues = textLines.mapValues(String::toUpperCase); uppercasedWithMapValues.to("UppercasedTextLinesTopic" ); KStream<byte [], String> uppercasedWithMap = textLines.map((key, value) -> new KeyValue<>(key, value.toUpperCase())); KStream<String, String> originalAndUppercased = textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase())); originalAndUppercased.to(stringSerde, stringSerde, "OriginalAndUppercasedTopic" ); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); Thread.sleep(500000L ); streams.close(); } }
パッケージ $ mvn clean package $ cp target/kafka-streams-sample-1.0-SNAPSHOT.jar ${kafka_root} /libs
実行 同一のapplication id
とtopic
を使いまわして実行すると動作しないので、実行毎にトピックを削除する必要がある。
Kafka起動 $ cd ${kafka_root} $ nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties & $ nohup ./bin/kafka-server-start.sh config/server.properties &
トピックを作る $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic TextLinesTopic $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic UppercasedTextLinesTopic $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic OriginalAndUppercasedTopic
データを書き込む echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txtcat /tmp/file-input.txt | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TextLinesTopic
Kafka Streams実行 PackageしたJarを以下のコマンドでKafkaにロードしてStreams処理が実行できます。
$ ./bin/kafka-run-class.sh com.example.App2
kafka-run-class.shの中を確認するとlibs以下がCLASSPATHに追加されているので基本的にlibs以下に作ったjarを置けば問題なさそうである。
(...) for file in "$base_dir " /libs/*;do if should_include_file "$file " ; then CLASSPATH="$CLASSPATH " :"$file " fi done (...)
そんな設定しらん!とWARNが出るが動くので今は無視。StreamsConfigがConsumerConfigを参照しているとかで出てるっぽい(?)
[2016-09-11 08:01:46,493] WARN The configuration replication.factor = 1 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) [2016-09-11 08:01:46,494] WARN The configuration num.standby.replicas = 0 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) [2016-09-11 08:01:46,494] WARN The configuration zookeeper.connect = localhost:2181 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) [2016-09-11 08:01:46,494] WARN The configuration __stream.thread.instance__ = Thread[StreamThread-1,5,main] was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
確認 $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic OriginalAndUppercasedTopic --from-beginning ALL STREAMS LEAD TO KAFKA HELLO KAFKA STREAMS JOIN KAFKA SUMMIT ALL STREAMS LEAD TO KAFKA HELLO KAFKA STREAMS JOIN KAFKA SUMMIT
upcaseされるた文字列がTopicに書き込まれていることが確認できた。
まとめ Kafka上でストリーミング処理ができるようになったことで、データ加工や集計などはKafkaだけで完結できるのですごくいい感じ。
おわり。
参考
http://kafka.apache.org/documentation.html#streams
http://docs.confluent.io/3.0.1/streams/index.html
http://developers.linecorp.com/blog/ja/?p=3718
http://qiita.com/_____/items/60f92a1db6dfce4303c5