Cómo derretir Spark DataFrame?

¿Hay un equivalente de la función Pandas Melt en Apache Spark en PySpark o al menos en Scala?

Estaba ejecutando un conjunto de datos de muestra hasta ahora en python y ahora quiero usar Spark para todo el conjunto de datos.

Gracias por adelantado.

No hay una función incorporada (si trabajas con soporte de SQL y Hive habilitado, puedes usar la función de stack , pero no está expuesto en Spark y no tiene una implementación nativa) pero es trivial desarrollar el tuyo. Importaciones requeridas:

 from pyspark.sql.functions import array, col, explode, lit, struct from pyspark.sql import DataFrame from typing import Iterable 

Ejemplo de implementación:

 def melt( df: DataFrame, id_vars: Iterable[str], value_vars: Iterable[str], var_name: str="variable", value_name: str="value") -> DataFrame: """Convert :class:`DataFrame` from wide to long format.""" # Create array> _vars_and_vals = array(*( struct(lit(c).alias(var_name), col(c).alias(value_name)) for c in value_vars)) # Add to the DataFrame and explode _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) cols = id_vars + [ col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] return _tmp.select(*cols) 

Y algunas pruebas (basadas en los docámenes de Pandas ):

 import pandas as pd pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'}, 'B': {0: 1, 1: 3, 2: 5}, 'C': {0: 2, 1: 4, 2: 6}}) pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C']) 
  A variable value 0 a B 1 1 b B 3 2 c B 5 3 a C 2 4 b C 4 5 c C 6 
 sdf = spark.createDataFrame(pdf) melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show() 
 +---+--------+-----+ | A|variable|value| +---+--------+-----+ | a| B| 1| | a| C| 2| | b| B| 3| | b| C| 4| | c| B| 5| | c| C| 6| +---+--------+-----+ 

Nota: Para usar con las versiones heredadas de Python, elimine las anotaciones de tipo.

Me encontré con esta pregunta en mi búsqueda de una implementación de melt en Spark para Scala.

Publicando mi puerto de Scala en caso de que alguien también tropiece con esto.

 import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame} /** Extends the [[org.apache.spark.sql.DataFrame]] class * * @param df the data frame to melt */ implicit class DataFrameFunctions(df: DataFrame) { /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format. * * melt is (kind of) the inverse of pivot * melt is currently (02/2017) not implemented in spark * * @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html) * @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark * * @todo method overloading for simple calling * * @param id_vars the columns to preserve * @param value_vars the columns to melt * @param var_name the name for the column holding the melted columns names * @param value_name the name for the column holding the values of the melted columns * */ def melt( id_vars: Seq[String], value_vars: Seq[String], var_name: String = "variable", value_name: String = "value") : DataFrame = { // Create array> val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*) // Add to the DataFrame and explode val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }} return _tmp.select(cols: _*) } } 

Como no soy tan avanzado considerando Scala , estoy seguro de que hay margen de mejora.

Cualquier comentario es bienvenido