Adjunte una columna al Marco de Datos en Apache Spark 1.3

¿Es posible y cuál sería el método más eficiente para agregar una columna al Marco de Datos?

Más específicamente, la columna puede servir como Id. De fila para el Marco de datos existente.

En un caso simplificado, leyendo de archivo y no tokenizing, puedo pensar en algo como a continuación (en Scala), pero se completa con errores (en la línea 3), y de todos modos no se ve como la mejor ruta posible:

var dataDF = sc.textFile("path/file").toDF() val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") dataDF = dataDF.withColumn("ID", rowDF("ID")) 

Ha pasado un tiempo desde que publiqué la pregunta y parece que a otras personas les gustaría obtener una respuesta también. Debajo está lo que encontré.

Así que la tarea original era agregar una columna con identificadores de fila (básicamente, una secuencia 1 to numRows ) a cualquier dataframe dado, por lo que se puede rastrear el orden / presencia de filas (por ejemplo, cuando muestreas). Esto puede lograrse por algo en esta línea:

 sqlContext.textFile(file). zipWithIndex(). map(case(d, i)=>i.toString + delimiter + d). map(_.split(delimiter)). map(s=>Row.fromSeq(s.toSeq)) 

En cuanto al caso general de agregar cualquier columna a cualquier dataframe:

Lo “más cercano” a esta funcionalidad en Spark API es withColumn y withColumnRenamed . De acuerdo con los documentos de Scala , el primero devuelve un nuevo DataFrame agregando una columna . En mi opinión, esta es una definición un poco confusa e incompleta. Ambas funciones solo pueden funcionar en this dataframe, es decir, con dos marcos de datos df1 y df2 con columna col :

 val df = df1.withColumn("newCol", df1("col") + 1) // -- OK val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL 

Por lo tanto, a menos que pueda transformar una columna en un dataframe existente a la forma que necesita, no puede usar withColumn o withColumnRenamed para withColumnRenamed columnas arbitrarias (independiente u otros marcos de datos).

Como se comentó anteriormente, la solución alternativa puede ser utilizar una join , esto sería bastante complicado, aunque posible, adjuntar las claves únicas como las anteriores con zipWithIndex para que ambos marcos de datos o columnas funcionen. Aunque la eficiencia es …

Está claro que agregar una columna al dataframe no es una funcionalidad fácil para el entorno distribuido y puede que no haya un método muy eficiente y ordenado para eso. Pero creo que todavía es muy importante tener esta funcionalidad básica disponible, incluso con advertencias de rendimiento.

No estoy seguro de si funciona en la chispa 1.3 pero en la chispa 1.5 que uso conColumna:

 import sqlContext.implicits._ import org.apache.spark.sql.functions._ df.withColumn("newName",lit("newValue")) 

Utilizo esto cuando necesito usar un valor que no está relacionado con las columnas existentes del dataframe

Esto es similar a la respuesta de @NehaM pero más simple

Tomé la ayuda de la respuesta anterior. Sin embargo, me parece incompleto si queremos cambiar un DataFrame y las API actuales son un poco diferentes en Spark 1.6 . zipWithIndex() devuelve una Tuple de (Row, Long) que contiene cada fila e índice correspondiente. Podemos usarlo para crear una nueva Row acuerdo a nuestras necesidades.

 val rdd = df.rdd.zipWithIndex() .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq)) val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields)) sqlContext.createDataFrame(rdd, newstructure ).show 

Espero que esto sea útil.

Puede usar row_number con la función Window como se muestra a continuación para obtener la identificación distinta para cada fila en un dataframe.

 df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe")) 

También puede usar monotonically_increasing_id para lo mismo que

 df.withColumn("ID", monotonically_increasing_id()) 

Y hay algunas otras formas también.