I read json data from Kafka and tried to process the data with flink table API.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
    "create table inputTable(" +
    "`src_ip` STRING," +
    "`src_port` STRING," +
    "`bytes_from_src` BIGINT," +
    "`pkts_from_src` BIGINT," +
    "`ts` TIMESTAMP(2) METADATA FROM 'timestamp'," +
    "WATERMARK FOR ts AS ts" +
") WITH (" +
    "'connector' = 'kafka'," +
    "'topic' = 'test'," +
    "'properties.bootstrap.servers' = 'localhost:9092'," +
    "'properties.group.id' = 'testGroup'," +
    "'scan.startup.mode' = 'earliest-offset'," +
    "'format' = 'json'," +
    "'json.fail-on-missing-field' = 'true'," +
    "'json.ignore-parse-errors' = 'false'" +
")");
Table inputTable = tEnv.from("inputTable");
inputTable.printSchema();
inputTable.execute().print();
Table windowedTable = inputTable
   .window(Tumble.over(lit(5).seconds()).on($("ts")).as("w"))
   .groupBy($("w"), $("src_ip"))
   .select($("w").start().as("window_start"),
           $("src_ip"),
           $("src_ip").count().as("src_ip_count"),                         
           $("bytes_from_src").avg().as("bytes_from_src_mean")                     
    );
windowedTable.execute().print();
There are 4 records in Kafka. The flink program prints out the schema info and the inputTable as the following:
Connected to the target VM, address: '127.0.0.1:62348', transport: 'socket'
(
  `src_ip` STRING,
  `src_port` STRING,
  `bytes_from_src` BIGINT,
  `pkts_from_src` BIGINT,
  `ts` TIMESTAMP(2) *ROWTIME* METADATA FROM 'timestamp',
  WATERMARK FOR `ts`: TIMESTAMP(2) AS `ts`
)
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| op |                         src_ip |                       src_port |       bytes_from_src |        pkts_from_src |                      ts |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| +I |                     44.38.5.31 |                          53159 |                  120 |                    3 |  2021-08-13 14:59:56.00 |
| +I |                   44.38.132.51 |                          39409 |                  100 |                    2 |  2021-08-13 14:58:11.00 |
| +I |                     44.38.4.44 |                          56758 |                  336 |                    6 |  2021-08-13 14:59:14.00 |
| +I |                     44.38.5.34 |                          40001 |                   80 |                    2 |  2021-08-13 14:57:04.00 |
After that, nothing is printed out. The program did not exit. I am running the flink within IDEA. At this point, it seems like a black box. There is no output, and I do not know how to trace a flink program.
If I commented out the line inputTable.execute().print();, the schema info is printed out, but nothing after that and the program does not exit.
The flink version used is 1.14.2.
 
    