I have test Kafka cluster with topic 'topic1' (with schema in Schema Registry).
I created stream from this topic:
CREATE STREAM topic1_basic_stream
WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO', TIMESTAMP='triggertime', TIMESTAMP_FORMAT='yyyyMMddHHmmssX');
Then I created aggregated table as select PUSH query (without windowing), with today data only:
CREATE TABLE topic1_agg_table_JSON WITH(VALUE_FORMAT='JSON') AS
select
   concat(product_type,' - ',region) id
  ,sum(sale_charge) sum_sale_charge
  ,count(1) cnt
from topic1_basic_stream
where TIMESTAMPTOSTRING(rowtime, 'yyyyMMdd') = TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyyMMdd')
group by concat(product_type,' - ',region)
EMIT CHANGES;
From ksqlDB CLI I run and see OK results:
SET 'auto.offset.reset'='earliest';
select * from topic1_agg_table_JSON EMIT CHANGES;
+----------------------+---------------------------+-----+
|id                    |sum_sale_charge            |CNT  |
+----------------------+---------------------------+-----+
|Pen - London          |90.0                       |45   |
|Book - Paris          |45.0                       |9    |
|Pen - Amsterdam       |26.0                       |13   |
|Keyboard - Oslo       |60.0                       |6    |
|Pen - London          |92.0                       |46   |
Press CTRL-C to interrupt
Also I can see topic1_agg_table_JSON in topics list, with json messages inside.
Target: I want to write consumer in node.js to emit these messages (using websockets) to browser(client) and visualize it (real-time) on client side.
Already tried: kafka-node module. Example code taken from https://github.com/SOHU-Co/kafka-node/blob/master/example/consumer.js With simple original (simple) Kafka topic everything works fine in this code, but if I'll change topic to my topic1_agg_table_JSON, it won't throw any error and won't print any messages.
Question: What is proper way to consume data from topic1_agg_table_JSON using Node.js ?