I am trying to read the kafka messages in google dataproc using pyspark - structured streaming.
Version details are :
- dataproc image verison is 2.0.0-RC22-debian10 (to get pyspark 3.0.1 verison with delta lake 0.7.0 as I have to finally write this data to delta hosted on google storage)
 - pyspark version 3.0.1 and python version used by pyspark is 3.7.3
 - The packages I am using is org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
 
io.delta:delta-core_2.12:0.7.0
org.apache.spark:spark-avro_2.12:3.0.1Snippet of the code is :
__my_dir = os.path.dirname("<directory_path>") 
jsonFormatSchema = open(os.path.join(__my_dir, 'avro_schema_file.avsc'), "r").read() 
df = spark \    
    .readStream \    
    .format("kafka") \   
    .option("kafka.bootstrap.servers", "<kafka_broker>") \   
    .option("subscribe", "<kafka_topic>") \    
    .option("startingOffsets", "latest") \    
    .load()\    
    .select(from_avro("value", jsonFormatSchema)
    .alias("element"))
df.printSchema()
 
df_output =     df.select("element.after.id","element.after.name","element.after.attribute","element.after.quantity")
StreamQuery = ( df_output.writeStream \ 
               .format("delta") \   
               .outputMode("append") \   
               .option("checkpointLocation","<check_point_location>") \   
               .trigger(once=True) \    
               .start("<target_delta_table>") \    )
Error I am getting is :
java.io.InvalidClassException: org.apache.kafka.common.TopicPartition;
class invalid for deserialization
Why spark fails to deserialize TopicPartition and how can I solve it?