I am getting following error : py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Version: Windows 10 , Python: 3.8.10, py4j: 0.10.9.7 , apache-flink: 1.17.1 java version : 1.8.0_351
import logging
import os
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
# created before executing this job.
def write_to_kafka(env):
    type_info = Types.ROW([Types.INT(), Types.STRING()])
    ds = env.from_collection(
        [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
        type_info=type_info)
    serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
    kafka_producer = FlinkKafkaProducer(
        topic='test_json_topic',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
    )
    # note that the output type of ds must be RowTypeInfo
    ds.add_sink(kafka_producer)
    env.execute()
def read_from_kafka(env):
    deserialization_schema = JsonRowDeserializationSchema.Builder() \
        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
        .build()
    kafka_consumer = FlinkKafkaConsumer(
        topics='test_json_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
    )
    kafka_consumer.set_start_from_earliest()
    env.add_source(kafka_consumer).print()
    env.execute()
if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
                             'flink-sql-connector-kafka-1.17.1.jar')
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///{}".format(kafka_jar))
    print("start writing data to kafka")
    write_to_kafka(env)
    print("start reading data from kafka")
    read_from_kafka(env)
Note: I have taken the code from this repo: Official Pylink Examples