I have single node Kafka instance running locally via docker-compose.
(system: Mac/Arm64, image: wurstmeister/kafka:2.13-2.6.0)
I want to use kafkacat (kcat installed via Homebrew) to instantly produce and consume messages to and from Kafka.
Here is a minimal script:
#!/usr/bin/env bash
NUM_MESSAGES=${1:-3} # use arg1 or use default=3
KCAT_ARGS="-q -u -c $NUM_MESSAGES -b localhost:9092 -t unbuffered"
log() { echo "$*" 1>&2; }
producer() {
log "starting producer"
for i in `seq 1 3`; do
echo "msg $i"
log "produced: msg $i"
sleep 1
done | kcat $KCAT_ARGS -P
}
consumer() {
log "starting consumer"
kcat $KCAT_ARGS -C -o end | while read line; do
log "consumed: $line"
done
}
producer&
consumer&
wait
I would expect (roughly) the following output:
starting producer
starting consumer
produced: msg 1
consumed: msg 1
produced: msg 2
consumed: msg 2
produced: msg 3
consumed: msg 3
However, I only get output with produced and consumed messages fully batched into two groups, even though both the consumer and producer are running in parallel:
starting producer
starting consumer
produced: msg 1
produced: msg 2
produced: msg 3
consumed: msg 1
consumed: msg 2
consumed: msg 3
Here are some kafkacat/kafka producer properties and the values I already tried to change the producer behavior.
# kcat options having no effect on the test case
-u # unbuffered output
-T # act like `tee` and echo input
# kafka properties having no effect on the test case
-X queue.buffering.max.messages=1
-X queue.buffering.max.kbytes=1
-X batch.num.messages=1
-X queue.buffering.max.ms=100
-X socket.timeout.ms=100
-X max.in.flight.requests.per.connection=1
-X auto.commit.interval.ms=100
-X request.timeout.ms=100
-X message.timeout.ms=100
-X offset.store.sync.interval.ms=1
-X message.copy.max.bytes=100
-X socket.send.buffer.bytes=100
-X linger.ms=1
-X delivery.timeout.ms=100
None of the options above had any effect on the pipeline.
What am I missing?
Edit: It seems to be a flushing issue with either kcat or librdkafka. Maybe the -X properties are not used correctly.
Here are the current observations (will edit them as I learn more):
When sending a larger payload of 10000 messages with a smaller delay in the script,
kcatwill produce several batches of messages. It seems to be size-based, but not configurable by any of the-Xoptions.The batches are then also correctly picked up by the consumer. So it must be a producer issue.
I also tried the script in docker with the current
kafkacatfrom the apline repos. This one seems to flush a but earlier; with less data needed to fill the "hidden" buffer. The-Xoptions also had no effect.Also the
-Xproperties seem to be checked. If I set out-of-range values, kcat (or maybe librdkafka) will complain. However, setting low values for any of the timeout and buffer size values has no effect.When calling
kcatfor every message (which is a bit of an overkill), the messages are produced instantly.
The question remains:
How do I tell a Kafka-pipeline to instantly produce my first message?
If you have an example in Go, this would also help, since I am having similar observations with a small Go program using kafka-go. I may post a separate question if I can strip that down to a postable format.
UPDATE: I tried using a bitnami image on a pure Linux host. Producing and consuming via kafkacat works as expected on this system. I will post an answer once I know more.