I am trying to implement a custom UDT and be able to reference it from Spark SQL (as explained in the Spark SQL whitepaper, section 4.4.2).
The real example is to have a custom UDT backed by an off-heap data structure using Cap'n Proto, or similar.
For this posting, I have made up a contrived example. I know that I could just use Scala case classes and not have to do any work at all, but that isn't my goal.
For example, I have a Person containing several attributes and I want to be able to SELECT person.first_name FROM person. I'm running into the error Can't extract value from person#1 and I'm not sure why.
Here is the full source (also available at https://github.com/andygrove/spark-sql-udt)
package com.theotherandygrove
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Example {
  def main(arg: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("Example")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val schema = StructType(List(
      StructField("person_id", DataTypes.IntegerType, true),
      StructField("person", new MockPersonUDT, true)))
    // load initial RDD
    val rdd = sc.parallelize(List(
      MockPersonImpl(1),
      MockPersonImpl(2)
    ))
    // convert to RDD[Row]
    val rowRdd = rdd.map(person => Row(person.getAge, person))
    // convert to DataFrame (RDD + Schema)
    val dataFrame = sqlContext.createDataFrame(rowRdd, schema)
    // register as a table
    dataFrame.registerTempTable("person")
    // selecting the whole object works fine
    val results = sqlContext.sql("SELECT person.first_name FROM person WHERE person.age < 100")
    val people = results.collect
    people.map(row => {
      println(row)
    })
  }
}
trait MockPerson {
  def getFirstName: String
  def getLastName: String
  def getAge: Integer
  def getState: String
}
class MockPersonUDT extends UserDefinedType[MockPerson] {
  override def sqlType: DataType = StructType(List(
    StructField("firstName", StringType, nullable=false),
    StructField("lastName", StringType, nullable=false),
    StructField("age", IntegerType, nullable=false),
    StructField("state", StringType, nullable=false)
  ))
  override def userClass: Class[MockPerson] = classOf[MockPerson]
  override def serialize(obj: Any): Any = obj.asInstanceOf[MockPersonImpl].getAge
  override def deserialize(datum: Any): MockPerson = MockPersonImpl(datum.asInstanceOf[Integer])
}
@SQLUserDefinedType(udt = classOf[MockPersonUDT])
@SerialVersionUID(123L)
case class MockPersonImpl(n: Integer) extends MockPerson with Serializable {
  def getFirstName = "First" + n
  def getLastName = "Last" + n
  def getAge = n
  def getState = "AK"
}
If I simply SELECT person FROM person then the query works. I just can't reference the attributes in SQL, even though they are defined in the schema.