I'm running a simple Spark project on a EMR YARN cluster to:
- read a textfile on S3 into an RDD[String]
- define a schema and convert that RDD into a DF
I am doing a mapPartition on the RDD to convert that RDD[String] into an RDD[Row].
My problem - I get a java.Lang.NullPointerException and I can't figure out what the problem is.
The stacktrace lists these 2 line numbers in the source code -
- the line of rdd1.mapPartition
- within the anonymous function, the line with the match case that matches the regular
Here's the stacktrace excerpt -
Caused by: java.lang.NullPointerException
    at packageA.Herewego$$anonfun$3.apply(Herewego.scala:107)
    at packageA.Herewego$$anonfun$3.apply(Herewego.scala:88)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
I've tried -
- The error occurs when running in YARN cluster mode - and not in Local mode (in my IDE). This made me think that something isn't defined on the Executor? I moved the createrowfunction def into the anonymous function def - it didn't work though.
Here's the code block
val rdd4: RDD[Row] = rdd1.mapPartitions((it:Iterator[String]) => {
    def createrow(a: List[String]): Row = {
      val format = new java.text.SimpleDateFormat("dd/MMM/yyyy HH:mm:ss Z")
      val re1: Row = Row.apply(a.head)
      val d: Date = format.parse(a.tail.mkString(" "))
      val t = new Timestamp(d.getTime)
      val re2: Row = Row.apply(t)
      Row.merge(re1, re2)
    }
    var output: List[Row] = List()
    while (it.hasNext) {
      val data: String = it.next()
      val res = data match {
        case rx(ipadd, date, time) => createrow(List(ipadd, date, time))
        case _ => createrow(List("0.0.0.0", "00/Jan/0000", "00:00:00 0"))
      }
      output = output :+ res
    }
    output.toIterator
  }).persist(MEMORY_ONLY)
// Collect and Persist the RDD in Memory
  val tmp = rdd4.collect()
Do I need to broadcast any variables or functions used within the mapPartition? Any pointers in the right direction will be more than appreciated.
