Columna de cadena Split Spark Dataframe en múltiples columnas

He visto a varias personas sugiriendo que Dataframe.explode es una forma útil de hacerlo, pero resulta en más filas que el dataframe original, que no es lo que quiero en absoluto. Simplemente quiero hacer el dataframe equivalente al muy simple:

 rdd.map(lambda row: row + [row.my_str_col.split('-')]) 

que toma algo que se parece a:

 col1 | my_str_col -----+----------- 18 | 856-yygrm 201 | 777-psgdg 

y lo convierte en esto:

 col1 | my_str_col | _col3 | _col4 -----+------------+-------+------ 18 | 856-yygrm | 856 | yygrm 201 | 777-psgdg | 777 | psgdg 

Soy consciente de pyspark.sql.functions.split() , pero da como resultado una columna de matriz anidada en lugar de dos columnas de nivel superior como yo quiero.

Idealmente, quiero que estas nuevas columnas sean nombradas también.

pyspark.sql.functions.split() es el enfoque correcto aquí; simplemente debe aplanar la columna anidada ArrayType en varias columnas de nivel superior. En este caso, donde cada matriz solo contiene 2 elementos, es muy fácil. Simplemente use Column.getItem() para recuperar cada parte de la matriz como una columna en sí misma:

 split_col = pyspark.sql.functions.split(df['my_str_col'], '-') df = df.withColumn('NAME1', split_col.getItem(0)) df = df.withColumn('NAME2', split_col.getItem(1)) 

El resultado será:

 col1 | my_str_col | NAME1 | NAME2 -----+------------+-------+------ 18 | 856-yygrm | 856 | yygrm 201 | 777-psgdg | 777 | psgdg 

No estoy seguro de cómo resolvería esto en un caso general en el que las matrices anidadas no tuvieran el mismo tamaño de Fila a Fila.

Aquí hay una solución para el caso general que no implica la necesidad de conocer la longitud de la matriz antes de tiempo, utilizando collect o usar udf s. Desafortunadamente, esto solo funciona para la versión de spark 2.1 y superior, porque requiere la función posexplode .

Supongamos que tiene el siguiente DataFrame:

 df = spark.createDataFrame( [ [1, 'A, B, C, D'], [2, 'E, F, G'], [3, 'H, I'], [4, 'J'] ] , ["num", "letters"] ) df.show() #+---+----------+ #|num| letters| #+---+----------+ #| 1|A, B, C, D| #| 2| E, F, G| #| 3| H, I| #| 4| J| #+---+----------+ 

Divida la columna de letters y luego use posexplode para explotar la matriz resultante junto con la posición en la matriz. Luego use pyspark.sql.functions.expr para tomar el elemento en el índice pos en esta matriz.

 import pyspark.sql.functions as f df.select( "num", f.split("letters", ", ").alias("letters"), f.posexplode(f.split("letters", ", ")).alias("pos", "val") )\ .show() #+---+------------+---+---+ #|num| letters|pos|val| #+---+------------+---+---+ #| 1|[A, B, C, D]| 0| A| #| 1|[A, B, C, D]| 1| B| #| 1|[A, B, C, D]| 2| C| #| 1|[A, B, C, D]| 3| D| #| 2| [E, F, G]| 0| E| #| 2| [E, F, G]| 1| F| #| 2| [E, F, G]| 2| G| #| 3| [H, I]| 0| H| #| 3| [H, I]| 1| I| #| 4| [J]| 0| J| #+---+------------+---+---+ 

Ahora creamos dos columnas nuevas a partir de este resultado. El primero es el nombre de nuestra nueva columna, que será una concatenación de letter y el índice en la matriz. La segunda columna será el valor en el índice correspondiente en la matriz. Obtenemos este último aprovechando la funcionalidad de pyspark.sql.functions.expr que nos permite usar valores de columnas como parámetros .

 df.select( "num", f.split("letters", ", ").alias("letters"), f.posexplode(f.split("letters", ", ")).alias("pos", "val") )\ .drop("val")\ .select( "num", f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"), f.expr("letters[pos]").alias("val") )\ .show() #+---+-------+---+ #|num| name|val| #+---+-------+---+ #| 1|letter0| A| #| 1|letter1| B| #| 1|letter2| C| #| 1|letter3| D| #| 2|letter0| E| #| 2|letter1| F| #| 2|letter2| G| #| 3|letter0| H| #| 3|letter1| I| #| 4|letter0| J| #+---+-------+---+ 

Ahora podemos simplemente groupBy num y pivot DataFrame. Juntando todo eso, obtenemos:

 df.select( "num", f.split("letters", ", ").alias("letters"), f.posexplode(f.split("letters", ", ")).alias("pos", "val") )\ .drop("val")\ .select( "num", f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"), f.expr("letters[pos]").alias("val") )\ .groupBy("num").pivot("name").agg(f.first("val"))\ .show() #+---+-------+-------+-------+-------+ #|num|letter0|letter1|letter2|letter3| #+---+-------+-------+-------+-------+ #| 1| A| B| C| D| #| 3| H| I| null| null| #| 2| E| F| G| null| #| 4| J| null| null| null| #+---+-------+-------+-------+-------+