My Kafka Streams aggregation reads a compact topic and does this:
(0_10, ..), (0_11, ..) ---> (0, [10]) (0, [10, 11])
I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them. Imagine Stream App consumes these messages:
- (0_10, ..)
- (1_11, ..)
- (0_13, ..)
and if the 3 previous messages arrive in a short time window, I expect to see this:
- (0,[10])
- (0, [10, 13])
- (1, [11])
I cannot figure out, how to tell my Kafka Stream application how long to wait for more aggregations, before spitting a new value.
My code is very simple
builder
    .table(keySerde, valueSerde, sourceTopic)
    .groupBy(StreamBuilder::groupByMapper)
    .aggregate(
        StreamBuilder::aggregateInitializer,
        StreamBuilder::aggregateAdder,
        StreamBuilder::aggregateSubtractor)
    .to(...);
Currently, it sometime aggregates in batches, but not sure how to tweak it:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
 
    