What I'd like to do is this:
- Consume records from a numbers topic (Long's)
- Aggregate (count) the values for each 5 sec window
- Send the FINAL aggregation result to another topic
My code looks like this:
KStream<String, Long> longs = builder.stream(
            Serdes.String(), Serdes.Long(), "longs");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
            longs.countByKey(TimeWindows.of("longCounts", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
          .to("long-counts");
It looks like everything works as expected, but the aggregations are sent to the destination topic for each incoming record. My question is how can I send only the final aggregation result of each window?
 
     
     
     
    
