kafkaのドキュメントのQuick StartにはKafkaのクラスタを構成する方法は記述があるが、zookeeper(以下、zk)のHA構成については書かれていないのでzkのアンサンブル構成の構築をやってみたので手順を書いておきます。
使用したバージョンはv0.10.0。
AWSのEC2を3台使って以下の構成を作ります。draw.io便利。

3つのインスタンス(172.31.0.[6-8])のそれぞれにkafkaとzkをインストールして3台のzkをアンサンブル構成にします。このzkを3つのブローカーから構成されたkafkaクラスタに参照させる構成を作ります。
セキュリティグループで使用するポート(2181, 2888, 3888, 9092)の通信を許可しておくこと。
kafkaとzkのインストール
言うまでも無いが3つのサーバそれぞれに対してインストールすること。
インストールはkafkaのアーカイブをダウンロードして解凍するだけ。
ファイルは/rootに設置するが必要があれば移動すること。
$ sudo su - |
zkの設定
まず、zkのクラスタリング設定を追加する。
root$ vi config/zookeeper.properties |
各設定項目を簡単に説明すると以下のとおり。
- tickTime: ハートビートを行うために使用され、tickTimeの2倍が最小セッションタイムアウトになる。
- initLimit: zkサーバのクオーラムがリーダーに接続しなければならない制限時間。tickTime * initLimitが設定される。
- syncLimit: リーダーに同期されるまでの制限時間。tickTime * syncLimitが設定される。
- server.X: 通信に使う接続情報。XはサーバのID。
このまま起動すると以下の様にmyidがないと怒られるので追加してから起動すること。
root$ bin/zookeeper-server-start.sh config/zookeeper.properties |
以下のようにmyidを追加する。myidに記述するidはconfig/zookeeper.propertiesのserver.Xで書いたものを指定する。root$ mkdir -p /tmp/zookeeper
root$ echo "1" > /tmp/zookeeper/myid
zkを起動する。root$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
kafkaの設定
サーバ毎に設定を作成する。broker.idとlistenersはサーバ毎に読み替えること。
root$ vi config/server.properties |
ec2のt2.microを使っておりメモリ不足で起動出来ないため割当てるメモリを減らした。
デフォルトのkafkaの使用メモリが1Gなので必要があれば調整すること。起動スクリプトの設定を書き換えるか、環境変数を以下の様に書き換えると変更できる。
root$ export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" |
zk-shellでkafkaの状態を確認する
全てのサーバでkafkaを起動した後、zkに接続してブローカーが登録されているか確認する。kafkaにzkのshellが用意されているのでそれを利用する。
root$ bin/zookeeper-shell.sh 172.31.0.8:2181 |
zkのshellから接続されているBrokerを確認する。表示されたbrokerのidをgetすると各brokerの状態がわかる。
ls /brokers/ids |
とりあえず、ブローカーが3つ登録されていることが分かる。
topicを作ったり、kafkaを落としてみたりする
適当に以下の3つのTopicを作ってみる。
トピック名 | パーティション | レプリカ |
---|---|---|
test01 | 2 | 2 |
test02 | 1 | 2 |
test03 | 2 | 3 |
3つのトピックを作る。root$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test01
Created topic "test01".
root$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test02
Created topic "test02".
root$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic test03
Created topic "test03".
root$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test01
test02
test03
それぞれのトピックを確認する。それっぽいものが設定したとおりに作成されている。
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test01 |
Broker単位でもなくTopicごとでもなくPartition単位でリーダーの割当がされていることが確認できる。
それっぽいデータのやり取りをしてみる。root$ bin/kafka-console-consumer.sh --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test03 --from-beginning
別コンソールで起動。root$ bin/kafka-console-producer.sh --broker-list 172.31.0.6:9092,172.31.0.7:9092,172.31.0.8:9092 --topic test03
データを送信すると正しく受信されることが確認できる。
次に2のkafka落とした後に確認するとin sync replicaの2が消えていることがわかる。
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test01 |
次に2のkafkaを復帰させるとisrに2が復帰していることがわかる。root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test01
Topic:test01 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test01 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test01 Partition: 1 Leader: 2 Replicas: 2,0 Isr: 0,2
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test02
Topic:test02 PartitionCount:1 ReplicationFactor:2 Configs:
Topic: test02 Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
root$ bin/kafka-topics.sh --describe --zookeeper 172.31.0.6:2181,172.31.0.7:2181,172.31.0.8:2181 --topic test03
Topic:test03 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test03 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
Topic: test03 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
zkは接続先にリストを指定しておけば接続先が落ちていると勝手にフェイルオーバーする仕組みになっている。producerの接続先のkafkaもリストを指定しておけば落ちている場合は自動的にフェイルオーバーする。
まとめ
zkに簡単に接続できるようになってていい感じ。zkは奇数台の構成が基本なので3, 5, 7台あたりで設定するといいかもしれない。brokerが多くなってもzkに接続しておけば勝手にリソースの管理ができる様になっているっぽい。
おわり。