Prestoを使えばKafkaに入っているメッセージにSQLでクエリが投げれるみたいなのでやってみました。

KafkaからHBaseやらNoSQLに入れ直すのはめんどうなのでPrestoを使ってみた感じです。
ちなみにPrestoは複数のデータソースに対して分散SQLクエリを発行するエンジンです。Kafkaは接続出来るデータソースの1つにすぎません。

環境

CentOS7に全部まとめてインストールして動作を確認します。

  • CentOS 7
  • Java 1.8
  • Presto 0.180
  • Kafka 0.11.0.0

kafkaインストール

まず、kafkaのインストールから。

root$ yum update -y
root$ yum install java

Kafkaのサポートバージョンはpresto 0.180のドキュメントを見るとKafkaは0.8.xでテストしていると書かれています。

Prestoのチケットを見るとkafka 0.10まではサポートしているみたいです。0.11.0は動くかわからないですが、試しに0.11.0を使ってみることにしました。

ダウンロードして起動する。

$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
$ tar zxfv kafka_2.11-0.11.0.0.tgz
$ cd kafka_2.11-0.11.0.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &

Prestoインストール

Singleで動かす場合も若干設定が必要なのでココを参考に設定します。

ダウンロードします。

$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.180/presto-server-0.180.tar.gz
$ tar zxfv presto-server-0.180.tar.gz

最小構成の設定をコピペします。

$ cd presto-server-0.180
$ mkdir -p etc/catalog

$ cat etc/node.properties
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/var/presto/data

$ cat etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=2GB
query.max-memory-per-node=1GB
discovery-server.enabled=true
discovery.uri=http://localhost:8080

$ cat etc/jvm.config
-server
-Xmx2G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError

$ cat etc/log.properties
com.facebook.presto=INFO

$ cat etc/catalog/kafka.properties
connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
kafka.hide-internal-columns=false

VMに割り当てているメモリが少なかったらしく、jvmに割り当てるメモリを少し減らしました。

起動します。

$ bin/launcher start

クエリを投げるためにpresto-cliをインストールします。

$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.180/presto-cli-0.180-executable.jar
$ mv presto-cli-0.180-executable.jar presto
$ chmod +x presto

Kafkaにデータを入れる

Kafka Connector Tutorialではtpchのデータを使うのでダウンロードしてデータをロードします。

$ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
$ chmod 755 kafka-tpch
$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny

いくつかERRORが出ますが、データが格納できているようなので大丈夫そうです。
念のため確認したところちゃんとロードできているようです。

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
tpch.customer
tpch.lineitem
tpch.nation
tpch.orders
tpch.part
tpch.partsupp
tpch.region
tpch.supplier

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tpch.orders --from-beginning
{"rowNumber":14997,"orderKey":59973,"customerKey":229,"orderStatus":"O","totalPrice":230718.14,"orderDate":"1995-11-16","orderPriority":"5-LOW","clerk":"Clerk#000000891","shipPriority":0,"comment":"lyly among the furiously fluffy theodolites. carefully f"}
{"rowNumber":14998,"orderKey":59974,"customerKey":761,"orderStatus":"O","totalPrice":63273.39,"orderDate":"1995-12-05","orderPriority":"2-HIGH","clerk":"Clerk#000000862","shipPriority":0,"comment":"counts. even, ironic packages cajole ironic "}
{"rowNumber":14999,"orderKey":59975,"customerKey":706,"orderStatus":"F","totalPrice":59995.27,"orderDate":"1993-12-22","orderPriority":"5-LOW","clerk":"Clerk#000000193","shipPriority":0,"comment":". even packages affix fluffily against "}
{"rowNumber":15000,"orderKey":60000,"customerKey":1426,"orderStatus":"P","totalPrice":299401.61,"orderDate":"1995-04-21","orderPriority":"2-HIGH","clerk":"Clerk#000000194","shipPriority":0,"comment":"usual frets use alongside of the furiou"}

kafkaにクエリを投げてみる

tpchのテーブルが読み込めているはずです。

$ ./presto --catalog kafka --schema tpch
presto:tpch> SHOW TABLES;
Table
----------
customer
lineitem
nation
orders
part
partsupp
region
supplier
(8 rows)

Query 20170714_182307_00002_9aq43, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [8 rows, 166B] [19 rows/s, 410B/s]

customerの定義を見ます。

presto:tpch> DESCRIBE customer;
Column | Type | Extra | Comment
-------------------+---------+-------+------------------------------------------
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partiti
_segment_start | bigint | | Segment start offset
_segment_end | bigint | | Segment end offset
_segment_count | bigint | | Running message count per segment
_key | varchar | | Key text
_key_corrupt | boolean | | Key data is corrupt
_key_length | bigint | | Total number of key bytes
_message | varchar | | Message text
_message_corrupt | boolean | | Message data is corrupt
_message_length | bigint | | Total number of message bytes
(11 rows)

Query 20170714_182327_00003_9aq43, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:06 [11 rows, 1009B] [1 rows/s, 160B/s]

countしてみます。

presto:tpch> SELECT count(*) FROM customer;
_col0
-------
1500
(1 row)

Query 20170714_182346_00004_9aq43, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:01 [1.5K rows, 411KB] [1.28K rows/s, 352KB/s]

selectします。

presto:tpch> SELECT _message FROM customer LIMIT 5;

--------------------------------------------------------------------------------
{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeR
{"rowNumber":2,"customerKey":2,"name":"Customer#000000002","address":"XSTf4,NCw
{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2W
{"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAG
{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCpl
(5 rows)

Query 20170714_182355_00005_9aq43, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:04 [232 rows, 63.1KB] [58 rows/s, 15.9KB/s]

JSONが返ってきているので、これをテーブルとして取り込むためににはPresto側でSchemeの定義が必要みたいです。

Schemeを設定します。

$ mkdir etc/kafka
$ cat etc/kafka/tpch.customer.json
{
"tableName": "customer",
"schemaName": "tpch",
"topicName": "tpch.customer",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{
"name": "row_number",
"mapping": "rowNumber",
"type": "BIGINT"
},
{
"name": "customer_key",
"mapping": "customerKey",
"type": "BIGINT"
},
{
"name": "name",
"mapping": "name",
"type": "VARCHAR"
},
{
"name": "address",
"mapping": "address",
"type": "VARCHAR"
},
{
"name": "nation_key",
"mapping": "nationKey",
"type": "BIGINT"
},
{
"name": "phone",
"mapping": "phone",
"type": "VARCHAR"
},
{
"name": "account_balance",
"mapping": "accountBalance",
"type": "DOUBLE"
},
{
"name": "market_segment",
"mapping": "marketSegment",
"type": "VARCHAR"
},
{
"name": "comment",
"mapping": "comment",
"type": "VARCHAR"
}
]
}
}

Scheme定義することでJSONだった部分がテーブルとして読み込むことができます。

$ bin/launcher restart
$ ./presto --catalog kafka --schema tpch
presto:tpch> SELECT * FROM customer LIMIT 5;
kafka_key | row_number | customer_key | name | address |
-----------+------------+--------------+--------------------+--------------------------------+
0 | 1 | 1 | Customer#000000001 | IVhzIApeRb ot,c,E |
1 | 2 | 2 | Customer#000000002 | XSTf4,NCwDVaWNe6tEgvwfmRchLXak |
2 | 3 | 3 | Customer#000000003 | MG9kdTD2WBHm |
3 | 4 | 4 | Customer#000000004 | XxVSJsLAGtn |
4 | 5 | 5 | Customer#000000005 | KvpyuHCplrB84WgAiGV6sYpZq7Tj |
(5 rows)

Schemeの定義が必要なのは面倒ですが、いいかんじですね。

おわり。

参考

  1. https://prestodb.io/docs/current/connector/kafka-tutorial.html
  2. https://kafka.apache.org/quickstart