Automáticamente y elegantemente aplana DataFrame en Spark SQL

Todas,

¿Existe una forma elegante y aceptada de aplanar una tabla Spark SQL (Parquet) con columnas que son de StructType nested StructType

Por ejemplo

Si mi esquema es:

 foo |_bar |_baz x y z 

¿Cómo lo selecciono en una forma tabular plana sin recurrir a ejecutar manualmente?

 df.select("foo.bar","foo.baz","x","y","z") 

En otras palabras, ¿cómo StructType el resultado del código anterior programáticamente dado solo un StructType y un DataFrame

La respuesta corta es que no hay una forma “aceptada” de hacerlo, pero puede hacerlo de manera muy elegante con una función recursiva que genere su statement de select(...) caminando por el DataFrame.schema .

La función recursiva debería devolver una Array[Column] . Cada vez que la función golpea un StructType , se llamaría a sí mismo y anexaría la Array[Column] devuelta a su propia Array[Column] .

Algo como:

 def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) } 

Entonces lo usarías así:

 df.select(flattenSchema(df.schema):_*) 

Estoy mejorando mi respuesta anterior y estoy ofreciendo una solución a mi propio problema enunciado en los comentarios de la respuesta aceptada.

Esta solución aceptada crea una matriz de objetos Columna y la usa para seleccionar estas columnas. En Spark, si tiene un DataFrame nested, puede seleccionar la columna secundaria como esta: df.select("Parent.Child") y esto devuelve un DataFrame con los valores de la columna secundaria y se denomina Child . Pero si tiene nombres idénticos para los atributos de diferentes estructuras principales, perderá la información sobre el padre y puede terminar con nombres de columna idénticos y ya no puede acceder a ellos por su nombre ya que no son ambiguos.

Este fue mi problema

Encontré una solución a mi problema, tal vez también puede ayudar a alguien más. Llamé al flattenSchema separado:

 val flattenedSchema = flattenSchema(df.schema) 

y esto devolvió un objeto Array of Column. En lugar de usar esto en select() , que devolvería un DataFrame con columnas nombradas por el elemento secundario del último nivel, correlacioné los nombres de las columnas originales como cadenas, luego de seleccionar Parent.Child column, lo renombra como Parent.Child lugar de Child (también reemplacé los puntos con guiones bajos para mi conveniencia):

 val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_"))) 

Y luego puede usar la función de selección como se muestra en la respuesta original:

 var newDf = df.select(renamedCols:_*) 

Solo quería compartir mi solución para Pyspark: es más o menos una traducción de la solución de @David Griffin, por lo que admite cualquier nivel de objetos nesteds.

 from pyspark.sql.types import StructType, ArrayType def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = prefix + '.' + field.name if prefix else field.name dtype = field.dataType if isinstance(dtype, ArrayType): dtype = dtype.elementType if isinstance(dtype, StructType): fields += flatten(dtype, prefix=name) else: fields.append(name) return fields df.select(flattenSchema(df.schema)).show() 

También puede usar SQL para seleccionar columnas como sin formato.

  1. Obtenga el esquema original del dataframe
  2. Generar una cadena de SQL, navegando por el esquema
  3. Consulta tu dataframe original

Hice una implementación en Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(También use el método recursivo, prefiero SQL, así que puede probarlo fácilmente a través de Spark-shell).

He estado utilizando líneas que dan como resultado un esquema plano con 5 columnas de barras, baz, x, y, z:

 df.select("foo.*", "x", "y", "z") 

En cuanto a explode : normalmente reservo explode para aplanar una lista. Por ejemplo, si tiene una idList columna que es una lista de cadenas, podría hacer:

 df.withColumn("flattenedId", functions.explode(col("idList"))) .drop("idList") 

Eso dará como resultado un nuevo Dataframe con una columna llamada flatchedId (ya no es una lista)

Aquí hay una función que está haciendo lo que quiere y que puede tratar con múltiples columnas anidadas que contienen columnas con el mismo nombre, con un prefijo:

 from pyspark.sql import functions as F def flatten_df(nested_df): flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'] nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'] flat_df = nested_df.select(flat_cols + [F.col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in nested_df.select(nc+'.*').columns]) return flat_df 

Antes de:

 root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) |-- bar: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) 

Después:

 root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo_a: float (nullable = true) |-- foo_b: float (nullable = true) |-- foo_c: integer (nullable = true) |-- bar_a: float (nullable = true) |-- bar_b: float (nullable = true) |-- bar_c: integer (nullable = true) 

DataFrame#flattenSchema un método DataFrame#flattenSchema al proyecto open source spark-daria .

Así es cómo puede usar la función con su código.

 import com.github.mrpowers.spark.daria.sql.DataFrameExt._ df.flattenSchema().show() +-------+-------+---------+----+---+ |foo.bar|foo.baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+ 

También puede especificar diferentes delimitadores de nombre de columna con el método flattenSchema() .

 df.flattenSchema(delimiter = "_").show() +-------+-------+---------+----+---+ |foo_bar|foo_baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+ 

Este parámetro delimitador es sorprendentemente importante. Si está planificando su esquema para cargar la tabla en Redshift, no podrá usar puntos como delimitador.

Aquí está el fragmento de código completo para generar esta salida.

 val data = Seq( Row(Row("this", "is"), "something", "cool", ";)") ) val schema = StructType( Seq( StructField( "foo", StructType( Seq( StructField("bar", StringType, true), StructField("baz", StringType, true) ) ), true ), StructField("x", StringType, true), StructField("y", StringType, true), StructField("z", StringType, true) ) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(schema) ) df.flattenSchema().show() 

El código subyacente es similar al código de David Griffin (en caso de que no desee agregar la dependencia de spark-daria a su proyecto).

 object StructTypeHelpers { def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = { schema.fields.flatMap(structField => { val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name structField.dataType match { case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName) case _ => Array(col(codeColName).alias(colName)) } }) } } object DataFrameExt { implicit class DataFrameMethods(df: DataFrame) { def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = { df.select( StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _* ) } } }