Chispa / Scala: llenado directo con la última observación

Usando Spark 1.4.0, Scala 2.10

He estado tratando de encontrar una forma de reenviar los valores nulos con la última observación conocida, pero no veo una manera fácil. Creo que esto es algo bastante común de hacer, pero no puedo encontrar un ejemplo que muestre cómo hacerlo.

Veo que las funciones para reenviar llenan el NaN con un valor, o funciones de retardo / adelanto para completar o desplazar datos por un desplazamiento, pero nada para recoger el último valor conocido.

Al mirar en línea, veo muchas preguntas y respuestas sobre lo mismo en R, pero no en Spark / Scala.

Estaba pensando en mapear durante un rango de fechas, filtrar los NaN de los resultados y elegir el último elemento, pero supongo que estoy confundido acerca de la syntax.

usando DataFrames, bash algo como

import org.apache.spark.sql.expressions.Window val sqlContext = new HiveContext(sc) var spec = Window.orderBy("Date") val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv") val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last) 

pero eso no me lleva a ninguna parte.

La parte del filtro no funciona; la función de mapa devuelve una Secuencia de spark.sql.Columns, pero la función de filtro espera devolver un Booleano, por lo que necesito obtener un valor de la Columna para probar, pero parece que solo hay métodos de Columnas que devuelven una Columna.

¿Hay alguna manera de hacer esto más ‘simplemente’ en Spark?

Gracias por tu contribución

EDITAR:

ejemplo simple de entrada de muestra:

 2015-06-01,33 2015-06-02, 2015-06-03, 2015-06-04, 2015-06-05,22 2015-06-06, 2015-06-07, ... 

Rendimiento esperado:

 2015-06-01,33 2015-06-02,33 2015-06-03,33 2015-06-04,33 2015-06-05,22 2015-06-06,22 2015-06-07,22 

NOTA: 1) Tengo muchas columnas, muchas de las cuales tienen este patrón de datos faltantes, pero no en la misma fecha / hora. Si necesito, haré la transformación una columna a la vez.

EDITAR :

Siguiendo la respuesta de @ zero323, intenté de esta manera:

  import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD val rows: RDD[Row] = df.orderBy($"Date").rdd def notMissing(row: Row): Boolean = { !row.isNullAt(1) } val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{ case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) } .collectAsMap val toCarryBd = sc.broadcast(toCarry) def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter } val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)} 

la variable de difusión termina como una lista de valores sin valores nulos. Eso es progreso, pero todavía no puedo hacer que la asignación funcione. pero no consigo nada, porque el índice i en el no se correlaciona con los datos originales, se asigna al subconjunto sin nulo.

¿Que me estoy perdiendo aqui?

EDIT y solución (como se deriva de la respuesta de @ zero323):

 import org.apache.spark.sql.expressions.Window val sqlContext = new HiveContext(sc) var spec = Window.partitionBy("id").orderBy("Date") val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv") val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*)) 

Consulte la respuesta de zero323 a continuación para obtener más opciones si está utilizando RDD en lugar de DataFrames. La solución anterior puede no ser la más eficiente pero funciona para mí. Si está buscando optimizar, consulte la solución RDD.

Respuesta inicial (una suposición de serie de tiempo única):

En primer lugar intente evitar las funciones de ventana si no puede proporcionar la cláusula PARTITION BY . Mueve los datos a una sola partición, por lo que la mayoría de las veces simplemente no es factible.

Lo que puede hacer es llenar los vacíos en RDD usando mapPartitionsWithIndex . Como no proporcionó un ejemplo de datos o resultados esperados, considere esto como un pseudocódigo, no como un verdadero progtwig de Scala:

  • primero permite ordenar DataFrame por fecha y convertir a RDD

     import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD val rows: RDD[Row] = df.orderBy($"Date").rdd 
  • A continuación, busquemos la última observación no nula por partición

     def notMissing(row: Row): Boolean = ??? val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows .mapPartitionsWithIndex{ case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) } .collectAsMap 
  • y convierte este Map para transmitir

     val toCarryBd = sc.broadcast(toCarry) 
  • finalmente mapear sobre particiones una vez más llenando los vacíos:

     def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { // If it is the beginning of partition and value is missing // extract value to fill from toCarryBd.value // Remember to correct for empty / only missing partitions // otherwise take last not-null from the current partition } val imputed: RDD[Row] = rows .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
  • finalmente convertir de nuevo a DataFrame

Editar (series / series temporales por datos de grupo):

El diablo está en el detalle. Si sus datos están divididos después de todo, entonces se puede resolver todo un problema usando groupBy . Supongamos que simplemente particiones por columna “v” de tipo T y Date es una marca de tiempo entera:

 def fill(iter: List[Row]): List[Row] = { // Just go row by row and fill with last non-empty value ??? } val groupedAndSorted = df.rdd .groupBy(_.getAs[T]("k")) .mapValues(_.toList.sortBy(_.getAs[Int]("Date"))) val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity) val dfFilled = sqlContext.createDataFrame(rows, df.schema) 

De esta forma puede llenar todas las columnas al mismo tiempo.

¿Se puede hacer esto con DataFrames en lugar de convertir de ida y vuelta en RDD?

Depende, aunque es poco probable que sea eficiente. Si la brecha máxima es relativamente pequeña, puede hacer algo como esto:

 import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.{WindowSpec, Window} import org.apache.spark.sql.Column val maxGap: Int = ??? // Maximum gap between observations val columnsToFill: List[String] = ??? // List of columns to fill val suffix: String = "_" // To disambiguate between original and imputed // Take lag 1 to maxGap and coalesce def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = { // Generate lag values between 1 and maxGap val lags = (1 to maxGap).map(lag(col(c), _)over(w)) // Add current, coalesce and set alias coalesce(col(c) +: lags: _*).alias(s"$c$suffix") } // For each column you want to fill nulls apply makeCoalesce val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_")) // Finally select val dfImputed = df.select($"*" :: lags: _*) 

Se puede ajustar fácilmente para usar diferentes espacios máximos por columna.

Una forma más sencilla de lograr un resultado similar en la última versión de Spark es usar la last con ignoreNulls :

 import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"k").orderBy($"Date") .rowsBetween(Window.unboundedPreceding, -1) df.withColumn("value", coalesce($"value", last($"value", true).over(w))) 

Si bien es posible eliminar la cláusula partitionBy y aplicar este método globalmente, sería prohibitivamente costoso con grandes conjuntos de datos.