DataFrame-ified zipWithIndex

Estoy tratando de resolver el viejo problema de agregar un número de secuencia a un conjunto de datos. Estoy trabajando con DataFrames, y parece que no hay un DataFrame equivalente a RDD.zipWithIndex . Por otro lado, lo siguiente funciona más o menos como yo quiero:

 val origDF = sqlContext.load(...) val seqDF= sqlContext.createDataFrame( origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)), StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields) ) 

En mi aplicación actual, origDF no se cargará directamente de un archivo; se creará al unir otros 2-3 DataFrames y contendrá más de 100 millones de filas.

¿Hay una mejor manera de hacer esto? ¿Qué puedo hacer para optimizarlo?

Desde Spark 1.6 hay una función llamada monotonically_increasing_id ()
Genera una nueva columna con un único índice monotónico de 64 bits para cada fila
Pero no es consecuencial, cada partición comienza un nuevo rango, por lo que debemos calcular cada desplazamiento de partición antes de usarlo.
Al tratar de proporcionar una solución “libre de rdd”, terminé con algunos collect (), pero solo recoge compensaciones, un valor por partición , por lo que no causará OOM

 def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) val partitionOffsets = dfWithPartitionId .groupBy("partition_id") .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") .orderBy("partition_id") .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" ) .collect() .map(_.getLong(0)) .toArray dfWithPartitionId .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) .withColumn(indexName, col("partition_offset") + col("inc_id")) .drop("partition_id", "partition_offset", "inc_id") } 

Esta solución no vuelve a empaquetar las filas originales y no reparticiona el enorme dataframe original, por lo que es bastante rápido en el mundo real: 200 GB de datos CSV (43 millones de filas con 150 columnas) leídos, indexados y empacados en parquet en 2 minutos en 240 núcleos
Después de probar mi solución, ejecuté la solución de Kirk Broadhurst y fue 20 segundos más lenta
Puede querer o no usar dfWithPartitionId.cache() , depende de la tarea

Lo siguiente fue publicado en nombre de David Griffin (editado fuera de cuestión).

El método dfZipWithIndex todo cantante y que todo lo baila. Puede establecer el desplazamiento inicial (que por defecto es 1), el nombre de la columna del índice (por defecto “id”) y colocar la columna en el anverso o el reverso:

 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.Row def dfZipWithIndex( df: DataFrame, offset: Int = 1, colName: String = "id", inFront: Boolean = true ) : DataFrame = { df.sqlContext.createDataFrame( df.rdd.zipWithIndex.map(ln => Row.fromSeq( (if (inFront) Seq(ln._2 + offset) else Seq()) ++ ln._1.toSeq ++ (if (inFront) Seq() else Seq(ln._2 + offset)) ) ), StructType( (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) ++ df.schema.fields ++ (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) ) ) } 

Comenzando en Spark 1.5, se agregaron expresiones de Window a Spark. En lugar de tener que convertir el DataFrame a un RDD , ahora puede usar org.apache.spark.sql.expressions.row_number . Tenga en cuenta que el rendimiento para el dfZipWithIndex anterior es significativamente más rápido que el algoritmo siguiente. Pero lo estoy publicando porque:

  1. Alguien más va a tener la tentación de probar esto
  2. Tal vez alguien pueda optimizar las expresiones a continuación

En cualquier caso, esto es lo que funciona para mí:

 import org.apache.spark.sql.expressions._ df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1)))) 

Tenga en cuenta que utilizo lit(1) tanto para el particionamiento como para el orden, esto hace que todo esté en la misma partición, y parece conservar el orden original del DataFrame , pero supongo que es lo que lo ralentiza.

Lo probé en un DataFrame 4 columnas con 7.000.000 filas y la diferencia de velocidad es significativa entre esto y el dfZipWithIndex anterior (como dije, las funciones de RDD son mucho, mucho más rápidas).

Versión PySpark:

 from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rowId"): ''' Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe and preserves a schema :param df: source dataframe :param offset: adjustment to zipWithIndex()'s index :param colName: name of the index column ''' new_schema = StructType( [StructField(colName,LongType(),True)] # new added field in front + df.schema.fields # previous schema ) zipped_rdd = df.rdd.zipWithIndex() new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) return spark.createDataFrame(new_rdd, new_schema) 

También creó una jira para agregar esta funcionalidad en Spark de forma nativa: https://issues.apache.org/jira/browse/SPARK-23074

@Evgeny, tu solución es interesante. Tenga en cuenta que hay un error cuando tiene particiones vacías (la matriz no tiene estos índices de partición, al menos me está pasando esto con la chispa 1.6), así que convertí la matriz en un Mapa (partitionId -> offsets).

Además, saqué las fonts de monotonically_increasing_id para tener “inc_id” comenzando desde 0 en cada partición.

Aquí hay una versión actualizada:

 import org.apache.spark.sql.catalyst.expressions.LeafExpression import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.LongType import org.apache.spark.sql.catalyst.expressions.Nondeterministic import org.apache.spark.sql.catalyst.expressions.codegen.GeneratedExpressionCode import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext import org.apache.spark.sql.types.DataType import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.expressions.Window case class PartitionMonotonicallyIncreasingID() extends LeafExpression with Nondeterministic { /** * From org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID * * Record ID within each partition. By being transient, count's value is reset to 0 every time * we serialize and deserialize and initialize it. */ @transient private[this] var count: Long = _ override protected def initInternal(): Unit = { count = 1L // notice this starts at 1, not 0 as in org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID } override def nullable: Boolean = false override def dataType: DataType = LongType override protected def evalInternal(input: InternalRow): Long = { val currentCount = count count += 1 currentCount } override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { val countTerm = ctx.freshName("count") ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 1L;") ev.isNull = "false" s""" final ${ctx.javaType(dataType)} ${ev.value} = $countTerm; $countTerm++; """ } } object DataframeUtils { def zipWithIndex(df: DataFrame, offset: Long = 0, indexName: String = "index") = { // from https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex) val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", new Column(PartitionMonotonicallyIncreasingID())) // collect each partition size, create the offset pages val partitionOffsets: Map[Int, Long] = dfWithPartitionId .groupBy("partition_id") .agg(max("inc_id") as "cnt") // in each partition, count(inc_id) is equal to max(inc_id) (I don't know which one would be faster) .select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") + lit(offset) as "cnt") .collect() .map(r => (r.getInt(0) -> r.getLong(1))) .toMap def partition_offset(partitionId: Int): Long = partitionOffsets(partitionId) val partition_offset_udf = udf(partition_offset _) // and re-number the index dfWithPartitionId .withColumn("partition_offset", partition_offset_udf(col("partition_id"))) .withColumn(indexName, col("partition_offset") + col("inc_id")) .drop("partition_id") .drop("partition_offset") .drop("inc_id") } }