I have an rdd of integers (i.e. RDD[Int]) and what I would like to do is to compute the following ten percentiles: [0th, 10th, 20th, ..., 90th, 100th]. What is the most efficient way to do that?
 
    
    - 5,132
- 14
- 54
- 78
10 Answers
You can :
- Sort the dataset via rdd.sortBy()
- Compute the size of the dataset via rdd.count()
- Zip with index to facilitate percentile retrieval
- Retrieve the desired percentile via rdd.lookup() e.g. for 10th percentile rdd.lookup(0.1 * size)
To compute the median and the 99th percentile: getPercentiles(rdd, new double[]{0.5, 0.99}, size, numPartitions);
In Java 8:
public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) {
    double[] values = new double[percentiles.length];
    JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions);
    JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap());
    for (int i = 0; i < percentiles.length; i++) {
        double percentile = percentiles[i];
        long id = (long) (rddSize * percentile);
        values[i] = indexed.lookup(id).get(0);
    }
    return values;
}
Note that this requires sorting the dataset, O(n.log(n)) and can be expensive on large datasets.
The other answer suggesting simply computing a histogram would not compute correctly the percentile: here is a counter example: a dataset composed of 100 numbers, 99 numbers being 0, and one number being 1. You end up with all the 99 0's in the first bin, and the 1 in the last bin, with 8 empty bins in the middle.
- 
                    If N percent is small like 10, 20% then I will do the following: – Laeeq Mar 15 '18 at 09:07
How about t-digest?
https://github.com/tdunning/t-digest
A new data structure for accurate on-line accumulation of rank-based statistics such as quantiles and trimmed means. The t-digest algorithm is also very parallel friendly making it useful in map-reduce and parallel streaming applications.
The t-digest construction algorithm uses a variant of 1-dimensional k-means clustering to product a data structure that is related to the Q-digest. This t-digest data structure can be used to estimate quantiles or compute other rank statistics. The advantage of the t-digest over the Q-digest is that the t-digest can handle floating point values while the Q-digest is limited to integers. With small changes, the t-digest can handle any values from any ordered set that has something akin to a mean. The accuracy of quantile estimates produced by t-digests can be orders of magnitude more accurate than those produced by Q-digests in spite of the fact that t-digests are more compact when stored on disk.
In summary, the particularly interesting characteristics of the t-digest are that it
- has smaller summaries than Q-digest
- works on doubles as well as integers.
- provides part per million accuracy for extreme quantiles and typically <1000 ppm accuracy for middle quantiles
- is fast
- is very simple
- has a reference implementation that has > 90% test coverage
- can be used with map-reduce very easily because digests can be merged
It should be fairly easy to use the reference Java implementation from Spark.
- 
                    3Actually there is a spark implementation of this by Erik Erlandson here: https://github.com/isarn/isarn-sketches-spark. It works great. The only catch I've found is that you cant save the TDigest object to parquet format. As long as you're just tossing a ton of data in and asking for some percentile results though, it is what you're looking for :) – John Humphreys Jan 16 '18 at 19:56
I discovered this gist
https://gist.github.com/felixcheung/92ae74bc349ea83a9e29
that contains the following function:
  /**
   * compute percentile from an unsorted Spark RDD
   * @param data: input data set of Long integers
   * @param tile: percentile to compute (eg. 85 percentile)
   * @return value of input data at the specified percentile
   */
  def computePercentile(data: RDD[Long], tile: Double): Double = {
    // NIST method; data to be sorted in ascending order
    val r = data.sortBy(x => x)
    val c = r.count()
    if (c == 1) r.first()
    else {
      val n = (tile / 100d) * (c + 1d)
      val k = math.floor(n).toLong
      val d = n - k
      if (k <= 0) r.first()
      else {
        val index = r.zipWithIndex().map(_.swap)
        val last = c
        if (k >= c) {
          index.lookup(last - 1).head
        } else {
          index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
        }
      }
    }
  }
 
    
    - 2,018
- 1
- 19
- 36
If you don't mind converting your RDD to a DataFrame, and using a Hive UDAF, you can use percentile. Assuming you've loaded HiveContext hiveContext into scope:
hiveContext.sql("SELECT percentile(x, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9)) FROM yourDataFrame")
I found out about this Hive UDAF in this answer.
Here is my Python implementation on Spark for calculating the percentile for a RDD containing values of interest.
def percentile_threshold(ardd, percentile):
    assert percentile > 0 and percentile <= 100, "percentile should be larger then 0 and smaller or equal to 100"
    return ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) \
            .lookup(np.ceil(ardd.count() / 100 * percentile - 1))[0]
# Now test it out
import numpy as np
randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)
print percentile_threshold(ardd,0.001)
print percentile_threshold(ardd,1)
print percentile_threshold(ardd,60.11)
print percentile_threshold(ardd,99)
print percentile_threshold(ardd,99.999)
print percentile_threshold(ardd,100)
# output:
# 1
# 100
# 6011
# 9900
# 10000
# 10000
Separately, I defined the following function to get the 10th to 100th percentile.
def get_percentiles(rdd, stepsize=10):
    percentiles = []
    rddcount100 = rdd.count() / 100 
    sortedrdd = ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0]))
    for p in range(0, 101, stepsize):
        if p == 0:
            pass
            # I am not aware of a formal definition of 0 percentile, 
            # you can put a place holder like this if you want
            # percentiles.append(sortedrdd.lookup(0)[0] - 1) 
        elif p == 100:
            percentiles.append(sortedrdd.lookup(np.ceil(rddcount100 * 100 - 1))[0])
        else:
            pv = sortedrdd.lookup(np.ceil(rddcount100 * p) - 1)[0]
            percentiles.append(pv)
    return percentiles
randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)
get_percentiles(ardd, 10)
# [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]
 
    
    - 3,645
- 10
- 33
- 57
- 
                    Shouldn't be `ardd` replaced by `rdd` in `sortedrdd` definition at `get_percentiles` ? As well as add `import numpy as np`. Iot does not seem t work with `numpy 1.11.3` – Jorge Lavín Aug 24 '17 at 07:27
Convert you RDD into a RDD of Double, and then use the .histogram(10) action. See DoubleRDD ScalaDoc
 
    
    - 4,556
- 1
- 22
- 23
- 
                    5.histogram(bucketCount) doesn't calculate percentiles, it "computes a histogram of the data using bucketCount number of buckets *evenly spaced between the minimum and maximum* of the RDD" – Dmitry Apr 04 '16 at 21:11
If N percent is small like 10, 20% then I will do the following:
- Compute the size of dataset, rdd.count(), skip it maybe you know it already and take as argument. 
- Rather then sorting the whole dataset, I will find out top(N) from each partition. For that I would have to find out N = what is N% of rdd.count, then sort the partitions and take top(N) from each partition. Now you have a much smaller dataset to sort. 
3.rdd.sortBy
4.zipWithIndex
5.filter (index < topN)
 
    
    - 357
- 1
- 4
- 15
Based on the answer given here Median UDAF in Spark/Scala, I used an UDAF to compute percentiles over spark windows (spark 2.1) :
First an abstract generic UDAF used for other aggregations
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
abstract class GenericUDAF extends UserDefinedAggregateFunction {
  def inputSchema: StructType =
    StructType(StructField("value", DoubleType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("window_list", ArrayType(DoubleType, false)) :: Nil
  )
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new ArrayBuffer[Double]()
  }
  def update(buffer: MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = {
    var bufferVal = buffer.getAs[mutable.WrappedArray[Double]](0).toBuffer
    bufferVal+=input.getAs[Double](0)
    buffer(0) = bufferVal
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = {
    buffer1(0) = buffer1.getAs[ArrayBuffer[Double]](0) ++ buffer2.getAs[ArrayBuffer[Double]](0)
  }
  def dataType: DataType
  def evaluate(buffer: Row): Any
}
Then the Percentile UDAF customized for deciles :
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class DecilesUDAF extends GenericUDAF {
  override def dataType: DataType = ArrayType(DoubleType, false)
  override def evaluate(buffer: Row): Any = {
    val sortedWindow = buffer.getAs[mutable.WrappedArray[Double]](0).sorted.toBuffer
    val windowSize = sortedWindow.size
    if (windowSize == 0) return null
    if (windowSize == 1) return (0 to 10).map(_ => sortedWindow.head).toArray
    (0 to 10).map(i => sortedWindow(Math.min(windowSize-1, i*windowSize/10))).toArray
  }
}
The UDAF is then instanciated and called over a partitionned and ordered window :
val deciles = new DecilesUDAF()
df.withColumn("mt_deciles", deciles(col("mt")).over(myWindow))
You can then split the resulting array into multiple columns with getItem :
def splitToColumns(size: Int, splitCol:String)(df: DataFrame) = {
  (0 to size).foldLeft(df) {
    case (df_arg, i) => df_arg.withColumn("mt_decile_"+i, col(splitCol).getItem(i))
  }
}
df.transform(splitToColumns(10, "mt_deciles" ))
The UDAF is slower than native spark functions but as long as each grouped bag or each window is relatively small and fits into a single executor, it should be fine. The main advantage is using spark parallelism. With little effort, this code could be extend to n-quantiles.
I tested the code using this function :
def testDecilesUDAF = {
    val window = W.partitionBy("user")
    val deciles = new DecilesUDAF()
    val schema = StructType(StructField("mt", DoubleType) :: StructField("user", StringType) :: Nil)
    val rows1 = (1 to 20).map(i => Row(i.toDouble, "a"))
    val rows2 = (21 to 40).map(i => Row(i.toDouble, "b"))
    val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows1++rows2), schema)
    df.withColumn("deciles", deciles(col("mt")).over(window))
      .transform(splitToColumns(10, "deciles" ))
      .drop("deciles")
      .show(100, truncate=false)
  }
First 3 lines of output :
+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|mt  |user|mt_decile_0|mt_decile_1|mt_decile_2|mt_decile_3|mt_decile_4|mt_decile_5|mt_decile_6|mt_decile_7|mt_decile_8|mt_decile_9|mt_decile_10|
+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|21.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |
|22.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |
|23.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |
 
    
    - 1,118
- 1
- 11
- 11
Another alternative way can be to use top and last on RDD of double. For example, val percentile_99th_value=scores.top((count/100).toInt).last
This method is more suited for individual percentiles.
 
    
    - 2,302
- 4
- 25
- 44
Here is my easy approach:
val percentiles = Array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1)
val accuracy = 1000000
df.stat.approxQuantile("score", percentiles, 1.0/accuracy)
output:
scala> df.stat.approxQuantile("score", percentiles, 1.0/accuracy)
res88: Array[Double] = Array(0.011044141836464405, 0.02022990956902504, 0.0317261666059494, 0.04638145491480827, 0.06498630344867706, 0.0892181545495987, 0.12161539494991302, 0.16825592517852783, 0.24740923941135406, 0.9188197255134583)
accuracy: The accuracy parameter (default: 10000) is a positive numeric literal which controls approximation accuracy at the cost of memory. Higher value of accuracy yields better accuracy, 1.0/accuracy is the relative error of the approximation.
 
    
    - 331
- 2
- 4
- 
                    How would this look like with GroupBy? It does not seem to understand `stat`. – Xavier John Mar 23 '22 at 18:50
 
     
     
     
    