I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
- Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate 15590657,246620,20150630 14066921,1907,20150621 14066921,1906,20150626 6522013,2349,20150626 6522013,2525,20150613
- Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate) 
- For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device. 
- Pick the last 30 days of bytes from this ordered set. 
- Find the moving average of bytes for the last date using a time period of 30. 
- Find the standard deviation of the bytes for the final date using a time period of 30. 
- Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3] 
I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.
Here is the data structure for the dataset.
package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}
The DeviceAnalyzer class looks like this:
package com.testing
import com.testing.DeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Sorting
object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Statistics Analyzer")
    val sc = new SparkContext(sparkConf)
    val logFile = args(0)
    val deviceAggregateLogs = sc.textFile(logFile).map(DeviceAggregates.parseLogLine).cache()
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    deviceIdsMap.foreach(
         // I am stuck here !!
    })
    sc.stop()
  }
}
But I am stuck with the actual implementation of this algorithm beyond this point.
