I have a large PySpark DataFrame:
import pyspark.sql as ps
spark = (
    ps.SparkSession.builder
    .appName('some_name')
    # ... lots of .config() lines
    .getOrCreate()
)
df = spark.table('some_table')
print(df.rdd.getNumPartitions())
# about 5k
print(df.count())
# about 650M
On a high level, I want to:
- (Preferably) Assert that the dataframe is sorted by a subset of columns (let's say 
SORT_COLUMNS). The dataframe is written usingALTER TABLE ... WRITE ORDERED BYSORT_COLUMNS, but better safe than sorry. - Split the dataframe into "reasonably-sized" chunks, where each chunk is a (1) sorted, (2) continuous piece of the dataframe (and the union of all chunks covers the whole dataframe)
 - Run a job on each chunk, where the job would have the following interface:
 
def process_chunk(iterator_over_rows_in_chunk):
    # Do some processing, aggregation, and occasionally
    # yield a new row with a new schema.
    # iterator_over_rows_in_chunk should yield rows from
    # a continuous chunk sorted by SORT_COLUMNS.
    # Ideally, rows should be represented as dicts.
    yield new_row
By "reasonably-sized", I mean:
- If that's possible, I want to split the dataframe in approximately constant-size chunks (say, ≈1M rows per chunk; it's fine if some chunks are a lot smaller or a bit bigger);
 - Otherwise, I want to group the dataframe by a subset of columns (say, 
GROUP_COLUMNS) and use those groups as chunks. 
As far as I understand, the closest thing in Spark interface to what I want is df.rdd.mapPartitions(), but it seems that partitions are not guaranteed to be continuous (see e.g. this answer).
So how do I map a PySpark DataFrame in sorted continuous chunks?