I have a s3 bucket with the structure //storage-layer/raw/__SOME_FOLDERS__. EG: //storage-layer/raw/GTest and //storage-layer/raw/HTest. In these folders, there is the potential to have a few other folders as well, such as raw/GTest/abc, raw/HTest/xyz. There will not be an overlap in folders abc and xyz from GTest or HTest.
I am successful in setting up a spark structured streaming to monitor raw/GTest/abc for parquet files coming in, and writing the results out to console.
def process_row(df, epoch_id):
df.show()
# Structured Streaming
(
self.spark
.readStream
.format("parquet")
.option("maxFilesPerTrigger", 20)
.option("inferSchema", "true")
.load("s3a://storage-layer/raw/GTest/abc/*")
.writeStream
.format("console")
.outputMode("append")
.trigger(processingTime="5 seconds")
# .foreachBatch(process_row)
.start()
.awaitTermination()
)
My problem is, how can i set up 1 structured streaming app to readStream from the upper folder: storage-layer/raw/* do some processing on it, and save it into a completely different folder / bucket in s3?
I have taken a look at foreachBatch above, but i'm not sure how to set it up such that it can achieve the end result. I get the error message Unable to infer schema for Parquet. It must be specified manually.
Example of end result:
parquet files saving into s3
storage-layer/raw/GTest/abc-> structured streamed + processed intostorage-layer/processed/GTest/abcas parquet file.parquet files saving into s3
storage-layer/raw/HTest/xyz-> structured streamed + processed intostorage-layer/processed/HTest/xyzas parquet file.