I am looking for logging capabilities in PySparks DataFrameReader and DataFrameWriter. When dataframes are read or written, the number of files, partitions, records/rows, rawbytes etc. involved in the operation should be tracked and returned to be usable afterwards.
I have checked official Apache Spark documentation and searched with Google, but seems to be no such capability implemented in PySpark right now. These are ideas I came up with:
- Checking same directories (used at read/write) with dbutilsinDatabricks, cloud provider libraries (boto3for AWS S3 orazure-storage-blobfor Azure) orPythonbuiltinospackage
- Interact with SparkContextand use something like StatusTracker to monitor job/stage progress and return job information viajobId(-> not checked)
- Run Sparkin some kind of debug/dryrun mode and extract infos from loggers (-> not checked)
But I am more looking for any easy-to-use implementation like with a fictional option("statistics", "true") setting:
df, read_statistics = spark.read.option("statistics", "true").csv("inputfile.csv")
write_statistics = df.write.option("statistics", "true").csv("outputfile.csv")
Thanks in advance for any insights on alternative implementations or future release plans of the community!
SOLUTION (partial):
An easy-to-use solution (for number of bytes and records) based on collecting and aggregating Spark events is via ContextSparkListener from pyspark-spy package:
from pyspark_spy import ContextSparkListener, register_listener
listener = ContextSparkListener()
register_listener(spark_context, listener)
with listener as events:
    df = spark.read.csv("inputfile.csv")
    spark.write.csv("outputfile.csv")
print("Read statistics: ", listener.stage_input_metrics_aggregate())
print("Write statistics: ", listener.stage_output_metrics_aggregate())
>>> Read statistics:  InputMetrics(bytesRead=140129708, recordsRead=271502)
>>> Write statistics:  OutputMetrics(bytesWritten=136828555, recordsWritten=265106)
