Given the following graph:
Where A has a value of 20, B has a value of 5 and C has a value of 10, I would like to use pyspark/graphframes to compute the power mean. That is,
In this case n is the number of items (3 in our case, for three vertices at A - including A), our p is taken to be n * 2 and  the normalization factor is 1/n, or 1/3. So the resulting value for A should be:
n = 3
norm_factor = 1/n
p = n * 2
result = (norm_factor * (20^p + 5^p + 10^p))^(1/p) = 16.697421658890875
So the question is, how do I compute this with pyspark/graphframes? I have the following graph:
spark = SparkSession.builder.appName('get-the-power').getOrCreate()
vertices = spark.createDataFrame([('1', 'A', 20), 
                                  ('2', 'B', 5),
                                  ('3', 'C', 10)],
                                  ['id', 'name', 'value'])
edges = spark.createDataFrame([('1', '2'), 
                               ('1', '3')],
                              ['src', 'dst'])
g = GraphFrame(vertices, edges)
I assume I'll need to aggregate the values from the children, and have been playing with message aggregation.
agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("totalValue"),
    sendToSrc=AM.dst['value'],
    sendToDst=AM.dst['value'])
agg.show()
This results in
+---+----------+
| id|totalValue|
+---+----------+
|  3|        10|
|  1|        15|
|  2|         5|
+---+----------+
How do I replace totalValue (the sqlsum) with the power mean? Surely there is a way to do this using Spark functions from pyspark?
--- UPDATE ---
It seems like I can sort of approximate this with a UDF.
def power_mean(values):
    n = len(values)
    norm_factor = 1/n
    p = n * 2
    return (norm_factor * sum([(x)**p for x in values]))**(1/p)
udf_power_mean = func.udf(power_mean, returnType=DoubleType())
# Aggregate the values from child vertices, as I was doing before.
agg = g.aggregateMessages(
    collect_list(AM.msg).alias("totalValue"),
    sendToSrc=AM.dst['value'],
    sendToDst=None)
# `concat` the value for this vertex with its children values. 
# We end up with an `array<int>` that we then pass to `udf_power_mean`.
new_vertices = agg.join(vertices, vertices.id == agg.id, "left")\
                .select(vertices.id, \
                        'name', \
                        'value', \
                        concat(array(col('value')), 'totalValue').alias("allValues"))\
                .withColumn('totalScore', udf_power_mean(col('allValues')))\
                .drop('allValues')
new_vertices.show()
This produces:
+---+----+-----+------------------+
| id|name|value|        totalScore|
+---+----+-----+------------------+
|  1| foo|   20|16.697421658890875|
+---+----+-----+------------------+
Is there anyway to do this without the UDF? Just plain spark functions?


 
    