¿Cómo puedo pasar parámetros adicionales a UDF en SparkSql?

Quiero analizar las columnas de fecha en un DataFrame , y para cada columna de fecha, la resolución para la fecha puede cambiar (es decir, 2011/01/10 => 2011/01 si la resolución se establece en “Mes”).

Escribí el siguiente código:

 def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = { import org.apache.spark.sql.functions._ val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)} val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)} val allColNames = dataframe.columns val allCols = allColNames.map(name => dataframe.col(name)) val mappedCols = { for(i  convertDateFunc(allCols(i), resolution(i))) case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i)) case _ => allCols(i) } } } dataframe.select(mappedCols:_*) }} 

Sin embargo, no funciona. Parece que solo puedo pasar Column a UDF. Y me pregunto si será muy lento si convierto el DataFrame a RDD y si aplica la función en cada fila.

¿Alguien sabe la solución correcta? ¡Gracias!

Solo usa un poco de currying:

 def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => SparkDateTimeConverter.convertDate(x, resolution)) 

y úsalo de la siguiente manera:

 case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i)) 

En una nota lateral, debería echar un vistazo a sql.functions.trunc y sql.functions.date_format . Estos deben al menos parte del trabajo sin usar UDF en absoluto.

Nota :

En Spark 2.2 o posterior, puede usar la función typedLit :

 import org.apache.spark.sql.functions.typedLit 

que admiten una gama más amplia de literales como Seq o Map .

Puede crear una Column literal para pasarla a un udf utilizando la función lit(...) definida en org.apache.spark.sql.functions

Por ejemplo:

 val takeRight = udf((s: String, i: Int) => s.takeRight(i)) df.select(takeRight($"stringCol", lit(1)))