I'm using Spark 2.1's Structured Streaming to read from a Kafka topic whose contents are binary avro-encoded.
Thus, after setting up the DataFrame:
val messages = spark
.readStream
.format("kafka")
.options(kafkaConf)
.option("subscribe", config.getString("kafka.topic"))
.load()
If I print the schema of this DataFrame (messages.printSchema()), I get the following:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- timestampType: integer (nullable = true)
This question should be orthogonal to the problem of avro-decoding, but let's assume I want to somehow convert the value content from the messages DataFrame into a Dataset[BusinessObject], by a function Array[Byte] => BusinessObject. For example completeness, the function may just be (using avro4s):
case class BusinessObject(userId: String, eventId: String)
def fromAvro(bytes: Array[Byte]): BusinessObject =
AvroInputStream.binary[BusinessObject](
new ByteArrayInputStream(bytes)
).iterator.next
Of course, as miguno says in this related question I cannot just apply the transformation with a DataFrame.map(), because I need to provide an implicit Encoder for such a BusinessObject.
That can be defined as:
implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]
Now, perform the map:
val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))
But if I query the new schema, I get the following:
root
|-- value: binary (nullable = true)
And I think that does not make any sense, as the dataset should use the Product properties of the BusinessObject case-class and get the correct values.
I've seen some examples on Spark SQL using .schema(StructType) in the reader, but I cannot do that, not just because I'm using readStream, but because I actually have to transform the column before being able to operate in such fields.
I am hoping to tell the Spark SQL engine that the transformedMessages Dataset schema is a StructField with the case class' fields.