As you already found out yourself, local Person doesn't have TypeTag. But it has WeakTypeTag (and ClassTag). Let's try to define Encoder for such class.
Naive approach with constructing TypeTag doesn't work
How to create a TypeTag manually?
In scala 2.12, why none of the TypeTag created in runtime is serializable?
Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available for X" error
Spark: DF.as[Type] fails to compile
implicit def ttag[A: WeakTypeTag]: TypeTag[A] = {
val ttag = null // hiding implicit by name
val wttagImpl = weakTypeTag[A].asInstanceOf[WeakTypeTag[A] {val mirror: Mirror; val tpec: TypeCreator}]
TypeTag[A](wttagImpl.mirror, wttagImpl.tpec)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/41b7439d2e504e37f29b02e3500d24b1
Similar results is for
def typeToTypeTag[T](
tpe: Type,
mirror: api.Mirror[universe.type]
): TypeTag[T] = {
TypeTag(mirror, new TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = {
assert(m eq mirror, s"TypeTag[$tpe] defined in $mirror cannot be migrated to $m.")
tpe.asInstanceOf[U#Type]
}
})
}
implicit def ttag[T: WeakTypeTag]: TypeTag[T] = {
val ttag = null
typeToTypeTag(weakTypeOf[T], mirror)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/c7a24abf1ff1011a1c87aa9d161d6395
implicit val personTtag: TypeTag[Person] = {
val personTtag = null
tb.eval(q"org.apache.spark.sql.catalyst.ScalaReflection.universe.typeTag[${weakTypeOf[Person]}]")
.asInstanceOf[TypeTag[Person]]
}
scala.tools.reflect.ToolBoxError: reflective toolbox failed due to unresolved free type variables
https://gist.github.com/DmytroMitin/6e35c0332f845fcd227d35ec49d4122f
This is how Encoder[T] is defined for T having TypeTag
implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
object Encoders {
def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
}
object ExpressionEncoder {
def apply[T : TypeTag](): ExpressionEncoder[T] = {
val mirror = ScalaReflection.mirror
val tpe = typeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}
}
Let's try to modify it for T having WeakTypeTag and ClassTag
implicit def apply[T: WeakTypeTag /*: ClassTag*/]: Encoder[T] = {
val tpe = weakTypeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/b58848fa6575b6fab0e9b8285095cc60
// (*)
implicit def apply[T/*: WeakTypeTag*/ : ClassTag]: Encoder[T] = {
val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: Main
https://gist.github.com/DmytroMitin/0c86933f96e136d44fff555295ce01dd
So finally let's make Main extend Serializable
+---+----+
| id|name|
+---+----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
+---+----+
https://gist.github.com/DmytroMitin/0e9b0bd2ed6237a4a1e1c40d620a9d88
So (*) is correct Encoder.
This doesn't seem to work with generic local Person
case class Person[T](id: Long, name: String, t: T)
java.lang.UnsupportedOperationException: No Encoder found for Person$1
https://gist.github.com/DmytroMitin/69496ce257fc9a3a7a5fbd004c52dcc0
scala.ScalaReflectionException: free type Person is not a class
https://gist.github.com/DmytroMitin/07bfe954dca677f0a39c06779b94280e
For generic local class the encoder should be (using both WeakTypeTag and ClassTag)
implicit def apply[T: WeakTypeTag : ClassTag]: Encoder[T] = {
val tpe0 = weakTypeTag[T].in(mirror).tpe
val typeArgs = tpe0/*.dealias*/.typeArgs
val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
val tpe1 = appliedType(tpe.typeConstructor, typeArgs)
val serializer = ScalaReflection.serializerForType(tpe1)
val deserializer = ScalaReflection.deserializerForType(tpe1)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}
https://gist.github.com/DmytroMitin/08c8f21ffb1427bfa15dd21fbdfb77fa
Well, now this doesn't work for a generic local class with type parameter that is a generic local class
val df: Dataset[Person[Person[Int]]] =
spark.range(10).map(i => Person(i, i.toString, Person(i, i.toString, i.toInt)))
scala.ScalaReflectionException: free type Person is not a class
https://gist.github.com/DmytroMitin/5bceb2b81f2391c5c312a045edb827a8
Improved version of codec:
case class Application(tycon: ClassTag[_], targs: List[Application])
class DeepClassTag[T](val classTags: Application)
object DeepClassTag {
def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]
implicit def deepClassTag0[A: ClassTag]: DeepClassTag[A] =
new DeepClassTag(Application(classTag[A], List()))
implicit def deepClassTag11[A[_], B1](implicit tycon: ClassTag[A[_]], dct1: DeepClassTag[B1]): DeepClassTag[A[B1]] =
new DeepClassTag(Application(tycon, List(dct1.classTags)))
implicit def deepClassTag12[A[_,_], B1, B2](implicit tycon: ClassTag[A[_,_]], dct1: DeepClassTag[B1], dct2: DeepClassTag[B1]): DeepClassTag[A[B1, B2]] =
new DeepClassTag(Application(tycon, List(dct1.classTags, dct2.classTags)))
// ...
implicit def deepClassTag2[A[_[_]], B1[_]](implicit tycon: ClassTag[A[B1]], dct1: DeepClassTag[B1[_]]): DeepClassTag[A[B1]] =
new DeepClassTag(Application(tycon, List(dct1.classTags)))
// ...
}
def improveStaticType[T: WeakTypeTag : DeepClassTag]: Type =
improveDynamicType(weakTypeOf[T], DeepClassTag[T].classTags)
def improveDynamicType(tpe: Type, classTags: Application): Type = {
val newTycon = improveFreeType(tpe, classTags.tycon.runtimeClass)
val targs = tpe.dealias.typeArgs
assert(targs.length == classTags.targs.length, s"( $targs ).length == ( ${classTags.targs} ).length")
val newArgs = targs.zip(classTags.targs).map((improveDynamicType _).tupled)
appliedType(newTycon, newArgs)
}
def improveFreeType(tpe: Type, cls: Class[_]): Type =
if (internal.isFreeType(tpe.typeSymbol)) {
val typeArgs = tpe.dealias.typeArgs
val typeConstructor = mirror.classSymbol(cls).toType.typeConstructor
appliedType(typeConstructor, typeArgs)
} else tpe
implicit def enc[T: WeakTypeTag : ClassTag : DeepClassTag]: Encoder[T] = {
val tpe = improveStaticType[T]
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}
https://gist.github.com/DmytroMitin/56044515e031fcf1e977ab213013861d
DeepClassTag seems not to work with higher-kinded classes
https://gist.github.com/DmytroMitin/6388a437507e8389f30230e08382d9ff
Improved version but still not always working (there are too many shapes of type constructors)
https://gist.github.com/DmytroMitin/2625ee20695404c6fc118ab8680808f2
Instead of manual definition of type-class instances for different shapes of type constructors, the type class DeepClassTag can be defined with macros as follows
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.macros.whitebox
case class Application(tycon: ClassTag[_], targs: List[Application])
class DeepClassTag[T](val classTags: Application)
object DeepClassTag {
def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]
implicit def mkDeepClassTag[T]/*(implicit tCtag: ClassTag[T])*/: DeepClassTag[T] =
macro DeepClassTagMacros.mkDeepClassTagImpl[T]
}
class DeepClassTagMacros(val c: whitebox.Context) {
import c.universe._
def findInstance[TC[_]](tpe: Type)(implicit wttag: WeakTypeTag[TC[_]]): Tree =
c.inferImplicitValue(
appliedType(weakTypeOf[TC[_]].typeConstructor, tpe),
silent = false
)
def mkDeepClassTagImpl[T: WeakTypeTag]/*(tCtag: c.Tree)*/ : Tree = {
val T = weakTypeOf[T]
val tCtag = findInstance[ClassTag](T)
val targCtags = T.dealias.typeArgs.map(arg => {
val argInst = findInstance[DeepClassTag](arg)
q"$argInst.classTags"
})
val targClassTags = q"_root_.scala.List.apply[Application](..$targCtags)"
q"new DeepClassTag[$T](Application($tCtag, $targClassTags))"
}
}
(Is it working?)
My PR to Spark to support local classes: https://github.com/apache/spark/pull/38740