I got not serializable error when running this code:
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
  def findHighestRatingUsers(movieRating: String): (String) = {
    val tokens = movieRating.split(",", -1)
    val movieTitle = tokens(0)
    val ratings = tokens.slice(1, tokens.size)
    val maxRating = ratings.max
    var userIds = ArrayBuffer[Int]()
    for(i <- 0 until ratings.length){
      if (ratings(i) == maxRating) {
        userIds += (i+1)
      }
    }
    return movieTitle + "," + userIds.mkString(",")
    return movieTitle
  }
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)
    val Lines = sc.textFile(args(0))
    val TitleAndMaxUserIds = Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}
The error occurs at line:
val TitleAndMaxUserIds =Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))
I believe this is due to something in function 'findHighestRatingUsers'. Could somebody explain why and how to fix it?
More info in the exception is like:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
    at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.map(RDD.scala:395)
    at Task1$.main(Task1.scala:63)
    at Task1.main(Task1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Task1$
Serialization stack:
    - object not serializable (class: Task1$, value: Task1$@3c770db4)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Task1$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Task1$.$anonfun$main$1:(LTask1$;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class Task1$$$Lambda$1023/20408451, Task1$$$Lambda$1023/20408451@4f59a516)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 22 more
I checked this post Difference between object and class in Scala and tried to use object to enclose the function:
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Function{
    def _findHighestRatingUsers(movieRating: String): (String) = {
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()
      for(i <- 0 until ratings.length){
        if (ratings(i) == maxRating) {
          userIds += (i+1)
        }
      }
      return movieTitle + "," + userIds.mkString(",")
    }
}
object Task1 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(args(0))
    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}
But still got exception With a huge amount of errors...
This time I tried to put object Function in the object task1 like this:
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
    
  object Function{
    def _findHighestRatingUsers(movieRating: String): (String) = {
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()
      for(i <- 0 until ratings.length){
        if (ratings(i) == maxRating) {
          userIds += (i+1)
        }
      }
      return movieTitle + "," + userIds.mkString(",")
    }
  }
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(args(0))
    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}
And problem solved. But I still don't know why the nested object solves this problem. Could somebody explain it? And further more, I have several points not sure:
- What is main function in scala? Is it the entrance of program?
- Why we use an object to describe main function?
- Could somebody give a common structure of a Scala program containing function, class or some basic components?
 
     
    