I've worked out a possible way without any UDFs. (Explanation after the code)
sdf. \
    withColumn('flag_odd', (func.col('epoch')%2 != 0).cast('int')). \
    withColumn('flag_even', (func.col('epoch')%2 == 0).cast('int')). \
    withColumn('sum_curr_and_prev', func.sum('volume').over(wd.orderBy('epoch').rowsBetween(-1, 0))). \
    withColumn('even_collected', func.collect_list(func.when(func.col('flag_odd') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
    withColumn('odd_collected', func.collect_list(func.when(func.col('flag_even') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
    withColumn('even_collected', func.when((func.col('flag_even') == 1) & (func.col('epoch') > 6), func.col('even_collected'))). \
    withColumn('odd_collected', func.when((func.col('flag_odd') == 1) & (func.col('epoch') > 6), func.col('odd_collected'))). \
    withColumn('all_collections', func.coalesce('even_collected', 'odd_collected')). \
    withColumn('median_val', func.sort_array('all_collections')[1]). \
    show()
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |epoch|volume|flag_odd|flag_even|sum_curr_and_prev|even_collected|  odd_collected|all_collections|median_val|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |    1|    30|       1|        0|               30|          null|           null|           null|      null|
# |    2|    77|       0|        1|              107|          null|           null|           null|      null|
# |    3|    73|       1|        0|              150|          null|           null|           null|      null|
# |    4|    57|       0|        1|              130|          null|           null|           null|      null|
# |    5|     6|       1|        0|               63|          null|           null|           null|      null|
# |    6|    37|       0|        1|               43|          null|           null|           null|      null|
# |    7|    75|       1|        0|              112|          null| [107, 130, 43]| [107, 130, 43]|       107|
# |    8|    44|       0|        1|              119|[150, 63, 112]|           null| [150, 63, 112]|       112|
# |    9|    50|       1|        0|               94|          null| [130, 43, 119]| [130, 43, 119]|       119|
# |   10|    65|       0|        1|              115| [63, 112, 94]|           null|  [63, 112, 94]|        94|
# |   11|    97|       1|        0|              162|          null| [43, 119, 115]| [43, 119, 115]|       115|
# |   12|    84|       0|        1|              181|[112, 94, 162]|           null| [112, 94, 162]|       112|
# |   13|    18|       1|        0|              102|          null|[119, 115, 181]|[119, 115, 181]|       119|
# |   14|    19|       0|        1|               37|[94, 162, 102]|           null| [94, 162, 102]|       102|
# |   15|    71|       1|        0|               90|          null| [115, 181, 37]| [115, 181, 37]|       115|
# |   16|    46|       0|        1|              117|[162, 102, 90]|           null| [162, 102, 90]|       102|
# |   17|    88|       1|        0|              134|          null| [181, 37, 117]| [181, 37, 117]|       117|
# |   18|    12|       0|        1|              100|[102, 90, 134]|           null| [102, 90, 134]|       102|
# |   19|    24|       1|        0|               36|          null| [37, 117, 100]| [37, 117, 100]|       100|
# |   20|    35|       0|        1|               59| [90, 134, 36]|           null|  [90, 134, 36]|        90|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
I've used the data you provided
data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
        (11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]
sdf = spark.sparkContext.parallelize(data).toDF(['epoch', 'volume'])
# +-----+------+
# |epoch|volume|
# +-----+------+
# |    1|    30|
# |    2|    77|
# |    3|    73|
# |    4|    57|
# |    5|     6|
# +-----+------+
- From there, I create the odd and even record identifiers. They'll help us in creating a collection. Note that I've assumed that epochis continuous and ordered.
- Then, I sum pairs of volume values. epoch 2 volume and epoch 1 volume, epoch 3 volume and epoch 2 volume, and so on.
- Creating collection - I've used the sql function collect_list()with awhen()condition within it. I figured, for every odd epoch, you collect the median of the previous 3 even epoch's pair sums (step 2). e.g. - for epoch 7, you'll need epoch 6's pair sum, epoch 4's pair sum, epoch 2's pair sum, i.e.[43, 130, 107]. Similar approach for even epochs.
- once you have the collection, median would be the middle value of the collection. The collection, being a list of 3 values, can be sorted and the second value can be extracted from the list. e.g. [43, 130, 107]can be sorted to[43, 107, 130]and107can be extracted with[43, 107, 130][1]