The docker-compose.yml file referenced here allows for easy setup of ZooKeeper and Kafka in 3 machines.
Let's have the IP addresses of the 3 machines as 1.1.1.1, 1.1.1.2, and 1.1.1.3.
- Machine 1: zk1, kafka1
- Machine 2: zk2, kafka2
- Machine 3: zk3, kafka3
All the ZooKeeper IP addresses are specified for each zk instance. That is:
- zk1: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
- zk2: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
- zk3: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
We do the same for each Kafka instance:
- kafka1: zookeepers (
KAFKA_ZOOKEEPER_CONNECT)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)] - kafka2: zookeepers (
KAFKA_ZOOKEEPER_CONNECT)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)] - kafka3: zookeepers (
KAFKA_ZOOKEEPER_CONNECT)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
This works well. I can send (produce) to one Kafka broker and get it from the other broker using kakfa-python:
from kafka import KafkaProducer, KafkaConsumer
p = KafkaProducer(bootstrap_servers='1.1.1.2:9092') # Send to one broker
p.send('topic1', b'1')
c = KafkaConsumer('topic1', bootstrap_servers='1.1.1.3:9092', auto_offset_reset='earliest') # Retrieve from another broker
next(c) # This retrieves it properly
Now, the questions are:
- (1) How many ZooKeeper nodes do I need to specify for each ZooKeeper instance?
- (2) How many ZooKeeper nodes do I need to specify for each Kafka instance (broker)?
- (3) How many Kafka brokers do I need to specify for Kafka producers (bootstrap_servers)?
- (4) How many Kafka brokers do I need to specify for Kafka consumers (bootstrap_servers)?
I've done some experiments and read a bit as well, but it helps to have someone verify (esp. since some of the answers I've read are still from earlier Kafka versions).
Question 1
Each zk instance must specify all the other zk instances. This makes sense, as this defines the whole zk cluster. Though, I am not sure if it is possible for zk to do node discovery, something like:
- zk1: zookeepers=[zk1, zk2]
- zk2: zookeepers=[zk2, zk3]
- zk3: zookeepers=[zk3, zk1]
in which it discovers all the other nodes on its own.
Question 2-4
I've tried setting kafka1 to only use zk1.
Let's assume that we are talking about topic1, which has one partition, one replication factor, and is on machine 1.
Observations:
- Assuming
zk1is up, I can produce to akafka2and consume fromkafka3. - If
zk1is down, I can produce tokafka1successfully. - If
zk1is down, I can also produce tokafka2orkafka3successfully. - If
zk1is down, I can still consume fromkafka1. - If
zk1is down, consumption fromkafka2orkafka3blocks until zk1 is back up. The messages that were sent whilezk1was down, but are marked bykafka2orkafka3as successfully sent can be retrieved here.
Based on these observations:
- Supposedly you only need to specify at least one broker in
bootstrap_servers, but it is still probably better to specify all of them. - While ZooKeeper is down: we can produce, but consumers block because it needs ZooKeeper to track consumer offsets. This is explained here, although this is still from 2015, when there were separate consumer APIs. I assume that the explanation here still holds though.