Cómo comprimir dos (o más) DataFrame en Spark

Tengo dos DataFrame a y b . a es como

 Column 1 | Column 2 abc | 123 cde | 23 

b es como

 Column 1 1 2 

Quiero comprimir a b (o incluso más) DataFrames, que se convierte en algo así como:

 Column 1 | Column 2 | Column 3 abc | 123 | 1 cde | 23 | 2 

¿Cómo puedo hacerlo?

La operación de este tipo no es compatible con una API de DataFrame. Es posible zip dos RDD, pero para hacerlo funcionar, tiene que coincidir tanto con el número de particiones como con el número de elementos por partición. Suponiendo que este es el caso:

 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField, StructType, LongType} val a: DataFrame = sc.parallelize(Seq( ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") // Merge rows val rows = a.rdd.zip(b.rdd).map{ case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)} // Merge schemas val schema = StructType(a.schema.fields ++ b.schema.fields) // Create new data frame val ab: DataFrame = sqlContext.createDataFrame(rows, schema) 

Si no se cumplen las condiciones anteriores, la única opción que se le viene a la mente es agregar un índice y unirse:

 def addIndex(df: DataFrame) = sqlContext.createDataFrame( // Add index df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, // Create schema StructType(df.schema.fields :+ StructField("_index", LongType, false)) ) // Add indices val aWithIndex = addIndex(a) val bWithIndex = addIndex(b) // Join and clean val ab = aWithIndex .join(bWithIndex, Seq("_index")) .drop("_index") 

En la implementación de Dataframes de Scala, no hay una forma simple de concatenar dos dataframes en uno. Simplemente podemos evitar esta limitación agregando índices a cada fila de los marcos de datos. Entonces, podemos hacer una unión interna por estos índices. Este es mi código auxiliar de esta implementación:

 val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId) val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3") val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId) aWithId.join(bWithId, "id") 

Un poco de lectura ligera: ¡comprueba cómo Python hace esto!

¿Qué pasa con SQL puro?

 SELECT room_name, sender_nickname, message_id, row_number() over (partition by room_name order by message_id) as message_index, row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index from messages order by room_name, message_id 

Sé que el OP estaba usando Scala, pero si, como yo, necesita saber cómo hacerlo en pyspark, intente con el código de Python a continuación. Al igual que la primera solución de @ zero323, se basa en RDD.zip() y, por lo tanto, fallará si ambos DataFrames no tienen el mismo número de particiones y el mismo número de filas en cada partición.

 from pyspark.sql import Row from pyspark.sql.types import StructType def zipDataFrames(left, right): CombinedRow = Row(*left.columns + right.columns) def flattenRow(row): left = row[0] right = row[1] combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__] return CombinedRow(*combinedVals) zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row)) combinedSchema = StructType(left.schema.fields + right.schema.fields) return zippedRdd.toDF(combinedSchema) joined = zipDataFrames(a, b)