SparkSQL: ¿cómo tratar con valores nulos en la función definida por el usuario?

Dada la Tabla 1 con una columna “x” de tipo String. Quiero crear la Tabla 2 con una columna “y” que es una representación entera de las cadenas de fechas dadas en “x”.

Esencial es mantener valores null en la columna “y”.

Tabla 1 (Dataframe df1):

 +----------+ | x| +----------+ |2015-09-12| |2015-09-13| | null| | null| +----------+ root |-- x: string (nullable = true) 

Tabla 2 (Dataframe df2):

 +----------+--------+ | x| y| +----------+--------+ | null| null| | null| null| |2015-09-12|20150912| |2015-09-13|20150913| +----------+--------+ root |-- x: string (nullable = true) |-- y: integer (nullable = true) 

Mientras que la función definida por el usuario (udf) para convertir valores de la columna “x” a los de la columna “y” es:

 val extractDateAsInt = udf[Int, String] ( (d:String) => d.substring(0, 10) .filterNot( "-".toSet) .toInt ) 

y trabajos, que tratan con valores nulos no es posible.

Aunque puedo hacer algo como

 val extractDateAsIntWithNull = udf[Int, String] ( (d:String) => if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt else 1 ) 

No he encontrado forma de “producir” valores null través de udfs (por supuesto, como Int s no puede ser null ).

Mi solución actual para la creación de df2 (Tabla 2) es la siguiente:

 // holds data of table 1 val df1 = ... // filter entries from df1, that are not null val dfNotNulls = df1.filter(df1("x") .isNotNull) .withColumn("y", extractDateAsInt(df1("x"))) .withColumnRenamed("x", "right_x") // create df2 via a left join on df1 and dfNotNull having val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x") 

Preguntas :

  • La solución actual parece engorrosa (y probablemente no es eficiente). ¿Hay una mejor manera?
  • @ Spark-developers: ¿Hay un tipo NullableInt planeado / disponible, de modo que el siguiente udf sea posible (vea el extracto del código)?

Extracto del código

 val extractDateAsNullableInt = udf[NullableInt, String] ( (d:String) => if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt else null ) 

Aquí es donde la Option es útil:

 val extractDateAsOptionInt = udf((d: String) => d match { case null => None case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt) }) 

o para hacerlo un poco más seguro en general:

 import scala.util.Try val extractDateAsOptionInt = udf((d: String) => Try( d.substring(0, 10).filterNot("-".toSet).toInt ).toOption) 

Todo el mérito recae en Dmitriy Selivanov que ha señalado esta solución como una edición (¿falta?) Aquí .

La alternativa es manejar null fuera de la UDF:

 import org.apache.spark.sql.functions.{lit, when} import org.apache.spark.sql.types.IntegerType val extractDateAsInt = udf( (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt ) df.withColumn("y", when($"x".isNull, lit(null)) .otherwise(extractDateAsInt($"x")) .cast(IntegerType) ) 

Scala en realidad tiene una buena función de fábrica, Option (), que puede hacer esto aún más conciso:

 val extractDateAsOptionInt = udf((d: String) => Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt)) 

Internamente, el método de aplicación del objeto Opción solo hace la comprobación nula por usted:

 def apply[A](x: A): Option[A] = if (x == null) None else Some(x) 

Código suplementario

Con la buena respuesta de @ zero323, creé el siguiente código, para tener funciones definidas por el usuario disponibles que manejen valores nulos como se describe. Espero, es útil para otros!

 /** * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that * handle `null` values. */ object NullableFunctions { import org.apache.spark.sql.functions._ import scala.reflect.runtime.universe.{TypeTag} import org.apache.spark.sql.UserDefinedFunction /** * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that * * if fnc input is null, None is returned. This will create a null value in the output Spark column. * * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column. * @param f function from A1 => RT * @tparam RT return type * @tparam A1 input parameter type * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above */ def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { udf[Option[RT],A1]( (i: A1) => i match { case null => None case s => Some(f(i)) }) } /** * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that * * if on of the function input parameters is null, None is returned. * This will create a null value in the output Spark column. * * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2) * as value in the output column. * @param f function from A1 => RT * @tparam RT return type * @tparam A1 input parameter type * @tparam A2 input parameter type * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above */ def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = { udf[Option[RT], A1, A2]( (i1: A1, i2: A2) => (i1, i2) match { case (null, _) => None case (_, null) => None case (s1, s2) => Some((f(s1,s2))) } ) } }