i am trying to use kafka as streamer and use spark to process data
config:
- python3.9 
- Kubuntu 21.10 
- echo $JAVA_HOME : /usr/lib/jvm/java-8-openjdk-amd64 
- echo $SPARK_HOME: /opt/spark 
- spark version: 3.2.0 
- pyspark version: pyspark-3.2.1-py2.py3 
- downloaded kafka version: kafka_2.13-3.1.0.tgz 
 kafka status:
- :~$ sudo systemctl status kafka 
 kafka.service - Apache Kafka Server
 Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
 Active: active (running) since Sat 2022-01-29 19:02:18 +0330; 4s ago
 Docs: http://kafka.apache.org/documentation.html
 Main PID: 5271 (java)
 Tasks: 74 (limit: 19017)
 Memory: 348.7M
 CPU: 5.188s
my python program:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import os
import findspark as fs
fs.init()
spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_3.1.0:{}'.format(spark_version)
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"
kafka_topic_name = "bdCovid19"
kafka_bootstrap_servers = 'localhost:9092'
if __name__ == "__main__":
    print("Welcome to DataMaking !!!")
    print("Stream Data Processing Application Started ...")
    print(time.strftime("%Y-%m-%d %H:%M:%S"))
    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
        .master("local[*]") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    # Construct a streaming DataFrame that reads from test-topic
    orders_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()
running on pycharm
Error:
raise RuntimeError("Java gateway process exited before sending its port number") RuntimeError: Java gateway process exited before sending its port number
in this line:  spark = SparkSession \
IF i remove os.environ lines from the code that error disaper but a got this :
raise converted from None pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
in this line:  orders_df = spark \
I have read these:
- Pyspark: Exception: Java gateway process exited before sending the driver its port number 
- Spark + Python - Java gateway process exited before sending the driver its port number? 
- Exception: Java gateway process exited before sending the driver its port number #743 
- Pyspark: Exception: Java gateway process exited before sending the driver its port number 
- Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) 
- pyspark.sql.utils.AnalysisException: Failed to find data source: kafka 
none of them worked for me! any suggestions?
 
    