I'm trying to create a Spark Streaming that consumes Kafka messages encoded in ProtoBuf.
Here is what I tried for the last few days
    import spark.implicits._
    def parseLine (str: Array[Byte]): ProtoSchema = ProtoSchema.parseFrom(str)   
    val storageLoc: String = "/tmp/avl/output"
    val checkpointLoc: String = "/tmp/avl/checkpoint"
    val dfStreamReader: DataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("failOnDataLoss", value = false)
      .option("subscribe", topics)
      .load()
    val dfStreamReaderValues: Dataset[Array[Byte]] = dfStreamReader.map(row => row.getAs[Array[Byte]]("value"))
    val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
    val dfRaw: DataFrame = spark.sqlContext.protoToDataFrame(rddProtoSchema.rdd)
    val streamWriterAirline: StreamingQuery = dfRaw.writeStream
      .format("parquet")
      .option("path", storageLoc)
      .option("checkpointLocation", checkpointLoc)
      .outputMode(Append)
      .trigger(ProcessingTime("2 seconds"))
      .start()
    spark.streams.awaitAnyTermination(20000)
With scalapb, I manage to make decode a binary proto file and convert to a dataframe. But with streaming, I get this exception at compile time in parsing line:
    val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
    >>>>>
    scala.ScalaReflectionException: <none> is not a term
Can anyone give some hint?