I have a RDD of Breeze Vectors and want to calculate their average. My first approach is to use aggregate:
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfterAll, FunSuite, Matchers, Suite }
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import breeze.linalg.{ Vector => BreezeVector }
class CalculateMean extends FunSuite with Matchers with GeneratorDrivenPropertyChecks with SparkSpec {
  test("Calculate mean") {
    type U = (BreezeVector[Double], Int)
    type T = BreezeVector[Double]
    val rdd: RDD[T] = sc.parallelize(List(1.0, 2, 3, 4, 5, 6).map { x => BreezeVector(x, x * x) }, 2)
    val zeroValue = (BreezeVector.zeros[Double](2), 0)
    val seqOp = (agg: U, x: T) => (agg._1 + x, agg._2 + 1)
    val combOp = (xs: U, ys: U) => (xs._1 + ys._1, xs._2 + ys._2)
    val mean = rdd.aggregate(zeroValue)(seqOp, combOp)
    println(mean._1 / mean._2.toDouble)
  }
}
/**
 * Setup and tear down spark context
 */
trait SparkSpec extends BeforeAndAfterAll {
  this: Suite =>
  private val master = "local[2]"
  private val appName = this.getClass.getSimpleName
  private var _sc: SparkContext = _
  def sc: org.apache.spark.SparkContext = _sc
  val conf: SparkConf = new SparkConf()
    .setMaster(master)
    .setAppName(appName)
  override def beforeAll(): Unit = {
    super.beforeAll()
    _sc = new SparkContext(conf)
  }
  override def afterAll(): Unit = {
    if (_sc != null) {
      _sc.stop()
      _sc = null
    }
    super.afterAll()
  }
}
However this algorithm may be numerical unstable (see https://stackoverflow.com/a/1346890/1037094).
How can I implement Knuths algorithm for Breeze Vectors in Spark and is rdd.aggregate the recommended way to do it?