kafkaのドキュメントのQuick StartにはKafkaのクラスタを構成する方法は記述があるが、zookeeper(以下、zk)のHA構成については書かれていないのでzkのアンサンブル構成の構築をやってみたので手順を書いておきます。

使用したバージョンはv0.10.0。

AWSのEC2を3台使って以下の構成を作ります。draw.io便利。

3つのインスタンス(172.31.0.[6-8])のそれぞれにkafkaとzkをインストールして3台のzkをアンサンブル構成にします。このzkを3つのブローカーから構成されたkafkaクラスタに参照させる構成を作ります。

セキュリティグループで使用するポート(2181, 2888, 3888, 9092)の通信を許可しておくこと。

kafkaとzkのインストール

言うまでも無いが3つのサーバそれぞれに対してインストールすること。

インストールはkafkaのアーカイブをダウンロードして解凍するだけ。
ファイルは/rootに設置するが必要があれば移動すること。

$ sudo su -
root$ yum update -y
root$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
root$ tar -xzf kafka_2.11-0.10.0.0.tgz
root$ cd kafka_2.11-0.10.0.0

zkの設定

まず、zkのクラスタリング設定を追加する。

root$ vi config/zookeeper.properties
# 以下を追記
tickTime=2000
initLimit=5
syncLimit=2
server.1=172.31.0.6:2888:3888
server.2=172.31.0.7:2888:3888
server.3=172.31.0.8:2888:3888

各設定項目を簡単に説明すると以下のとおり。

  • tickTime: ハートビートを行うために使用され、tickTimeの2倍が最小セッションタイムアウトになる。
  • initLimit: zkサーバのクオーラムがリーダーに接続しなければならない制限時間。tickTime * initLimitが設定される。
  • syncLimit: リーダーに同期されるまでの制限時間。tickTime * syncLimitが設定される。
  • server.X: 通信に使う接続情報。XはサーバのID。

このまま起動すると以下の様にmyidがないと怒られるので追加してから起動すること。

root$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2016-07-31 08:36:37,977] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-07-31 08:36:37,983] INFO Defaulting to majority quorums (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-07-31 08:36:37,987] ERROR Invalid config, exiting abnormally (org.apache.zookeeper.server.quorum.QuorumPeerMain)
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error processing config/zookeeper.properties
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:123)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:101)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
Caused by: java.lang.IllegalArgumentException: /tmp/zookeeper/myid file is missing
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:350)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:119)
... 2 more
Invalid config, exiting abnormally

以下のようにmyidを追加する。myidに記述するidはconfig/zookeeper.propertiesのserver.Xで書いたものを指定する。

root$ mkdir -p /tmp/zookeeper
root$ echo "1" > /tmp/zookeeper/myid

zkを起動する。

root$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

kafkaの設定

サーバ毎に設定を作成する。broker.idとlistenersはサーバ毎に読み替えること。

root$ vi config/server.properties
broker.id=0
listeners=PLAINTEXT://172.31.0.6:9092
zookeeper.connect=172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181

ec2のt2.microを使っておりメモリ不足で起動出来ないため割当てるメモリを減らした。
デフォルトのkafkaの使用メモリが1Gなので必要があれば調整すること。起動スクリプトの設定を書き換えるか、環境変数を以下の様に書き換えると変更できる。

root$ export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M"
root$ bin/kafka-server-start.sh config/server.properties

zk-shellでkafkaの状態を確認する

全てのサーバでkafkaを起動した後、zkに接続してブローカーが登録されているか確認する。kafkaにzkのshellが用意されているのでそれを利用する。

root$ bin/zookeeper-shell.sh 172.31.0.8:2181
Connecting to 172.31.0.8:2181

zkのshellから接続されているBrokerを確認する。表示されたbrokerのidをgetすると各brokerの状態がわかる。

ls /brokers/ids
[2, 1, 0]

とりあえず、ブローカーが3つ登録されていることが分かる。

topicを作ったり、kafkaを落としてみたりする

適当に以下の3つのTopicを作ってみる。

トピック名 パーティション レプリカ
test01 2 2
test02 1 2
test03 2 3

3つのトピックを作る。

root$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test01
Created topic "test01".
root$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test02
Created topic "test02".
root$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic test03
Created topic "test03".
root$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test01
test02
test03

それぞれのトピックを確認する。それっぽいものが設定したとおりに作成されている。

root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test01
Topic:test01 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test01 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test01 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test02
Topic:test02 PartitionCount:1 ReplicationFactor:2 Configs:
Topic: test02 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test03
Topic:test03 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test03 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test03 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

Broker単位でもなくTopicごとでもなくPartition単位でリーダーの割当がされていることが確認できる。

それっぽいデータのやり取りをしてみる。

root$ bin/kafka-console-consumer.sh --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test03 --from-beginning

別コンソールで起動。

root$ bin/kafka-console-producer.sh --broker-list 172.31.0.6:9092,172.31.0.7:9092,172.31.0.8:9092 --topic test03

データを送信すると正しく受信されることが確認できる。

次に2のkafka落とした後に確認するとin sync replicaの2が消えていることがわかる。

root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test01
Topic:test01 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test01 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1
Topic: test01 Partition: 1 Leader: 0 Replicas: 2,0 Isr: 0
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test02
Topic:test02 PartitionCount:1 ReplicationFactor:2 Configs:
Topic: test02 Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test03
Topic:test03 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test03 Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
Topic: test03 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1

次に2のkafkaを復帰させるとisrに2が復帰していることがわかる。

root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test01
Topic:test01 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test01 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test01 Partition: 1 Leader: 2 Replicas: 2,0 Isr: 0,2
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test02
Topic:test02 PartitionCount:1 ReplicationFactor:2 Configs:
Topic: test02 Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test03
Topic:test03 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test03 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
Topic: test03 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

zkは接続先にリストを指定しておけば接続先が落ちていると勝手にフェイルオーバーする仕組みになっている。producerの接続先のkafkaもリストを指定しておけば落ちている場合は自動的にフェイルオーバーする。

まとめ

zkに簡単に接続できるようになってていい感じ。zkは奇数台の構成が基本なので3, 5, 7台あたりで設定するといいかもしれない。brokerが多くなってもzkに接続しておけば勝手にリソースの管理ができる様になっているっぽい。

おわり。

参考

  1. http://kafka.apache.org/documentation.html#quickstart
  2. https://zookeeper.apache.org/