¿Cómo definir el esquema para el tipo personalizado en Spark SQL?

El siguiente código de ejemplo intenta colocar algunos objetos de caso en un dataframe. El código incluye la definición de una jerarquía de objetos de caso y una clase de caso que usa este rasgo:

import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.SQLContext sealed trait Some case object AType extends Some case object BType extends Some case class Data( name : String, t: Some) object Example { def main(args: Array[String]) : Unit = { val conf = new SparkConf() .setAppName( "Example" ) .setMaster( "local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF() df.show() } } 

Al ejecutar el código, desafortunadamente encuentro la siguiente excepción:

 java.lang.UnsupportedOperationException: Schema for type Some is not supported 

Preguntas

  • ¿Existe la posibilidad de agregar o definir un esquema para ciertos tipos (aquí escriba Some )?
  • ¿Existe otro enfoque para representar este tipo de enumeraciones?
    • Traté de usar Enumeration directamente, pero también sin éxito. (vea abajo)

Código para la Enumeration :

 object Some extends Enumeration { type Some = Value val AType, BType = Value } 

Gracias por adelantado. Espero que el mejor enfoque no sea usar cadenas en su lugar.

Spark 2.0.0+ :

UserDefinedType se ha hecho privado en Spark 2.0.0 y, por ahora, no tiene un reemplazo UserDefinedType con Dataset .

Ver: SPARK-14155 (Hide UserDefinedType en Spark 2.0)

La mayoría de las veces, el Dataset estático puede servir como reemplazo. Hay un Jira SPARK-7768 pendiente para hacer que la API UDT vuelva a ser pública con la versión 2.4 de destino.

Consulte también ¿Cómo almacenar objetos personalizados en Dataset?

Spark <2.0.0

¿Existe la posibilidad de agregar o definir un esquema para ciertos tipos (aquí escriba Some)?

Supongo que la respuesta depende de cuánto lo necesites. Parece que es posible crear un UserDefinedType pero requiere acceso a DeveloperApi y no es exactamente sencillo ni está bien documentado.

 import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[SomeUDT]) sealed trait Some case object AType extends Some case object BType extends Some class SomeUDT extends UserDefinedType[Some] { override def sqlType: DataType = IntegerType override def serialize(obj: Any) = { obj match { case AType => 0 case BType => 1 } } override def deserialize(datum: Any): Some = { datum match { case 0 => AType case 1 => BType } } override def userClass: Class[Some] = classOf[Some] } 

Probablemente deberías anular hashCode y también equals igual.

Su contraparte PySpark puede verse así:

 from enum import Enum, unique from pyspark.sql.types import UserDefinedType, IntegerType class SomeUDT(UserDefinedType): @classmethod def sqlType(self): return IntegerType() @classmethod def module(cls): return cls.__module__ @classmethod def scalaUDT(cls): # Required in Spark < 1.5 return 'net.zero323.enum.SomeUDT' def serialize(self, obj): return obj.value def deserialize(self, datum): return {x.value: x for x in Some}[datum] @unique class Some(Enum): __UDT__ = SomeUDT() AType = 0 BType = 1 

En Spark <1.5 Python UDT requiere un emparejado de Scala UDT, pero parece que ya no es el caso en 1.5.

Para un UDT simple como puede usar tipos simples (por ejemplo IntegerType lugar de Struct completo).