Claves principales con Apache Spark

Estoy teniendo una conexión JDBC con Apache Spark y PostgreSQL y quiero insertar algunos datos en mi base de datos. Cuando uso el modo de DataFrame.Row necesito especificar el id para cada DataFrame.Row . ¿Hay alguna forma de que Spark cree claves primarias?

Scala :

Si todo lo que necesita son números únicos, puede usar zipWithUniqueId y recrear DataFrame. Primero algunas importaciones y datos ficticios:

 import sqlContext.implicits._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, LongType} val df = sc.parallelize(Seq( ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar") 

Extraiga el esquema para un uso posterior:

 val schema = df.schema 

Agregar campo de identificación:

 val rows = df.rdd.zipWithUniqueId.map{ case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} 

Crear DataFrame:

 val dfWithPK = sqlContext.createDataFrame( rows, StructType(StructField("id", LongType, false) +: schema.fields)) 

Lo mismo en Python :

 from pyspark.sql import Row from pyspark.sql.types import StructField, StructType, LongType row = Row("foo", "bar") row_with_index = Row(*["id"] + df.columns) df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF() def make_row(columns): def _make_row(row, uid): row_dict = row.asDict() return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) return _make_row f = make_row(df.columns) df_with_pk = (df.rdd .zipWithUniqueId() .map(lambda x: f(*x)) .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields))) 

Si prefiere un número consecutivo, puede reemplazar zipWithUniqueId con zipWithIndex pero es un poco más caro.

Directamente con DataFrame API :

(Universal Scala, Python, Java, R con casi la misma syntax)

Anteriormente, me he perdido la función monotonicallyIncreasingId , que debería funcionar bien siempre y cuando no requiera números consecutivos:

 import org.apache.spark.sql.functions.monotonicallyIncreasingId df.withColumn("id", monotonicallyIncreasingId).show() // +---+----+-----------+ // |foo| bar| id| // +---+----+-----------+ // | a|-1.0|17179869184| // | b|-2.0|42949672960| // | c|-3.0|60129542144| // +---+----+-----------+ 

Si bien es útil, monotonicallyIncreasingId no es determinista. No solo los identificadores pueden ser diferentes de la ejecución a la ejecución, pero sin trucos adicionales no se pueden utilizar para identificar filas cuando las operaciones posteriores contienen filtros.

Nota :

También es posible usar la función de ventana rowNumber :

 from pyspark.sql.window import Window from pyspark.sql.functions import rowNumber w = Window().orderBy() df.withColumn("id", rowNumber().over(w)).show() 

Desafortunadamente:

Ventana WARN: ¡No se ha definido ninguna partición para el funcionamiento de la ventana! Mover todos los datos a una sola partición, esto puede causar una grave degradación del rendimiento.

Entonces, a menos que tenga una forma natural de dividir sus datos y garantizar la singularidad, no es particularmente útil en este momento.

 from pyspark.sql.functions import monotonically_increasing_id df.withColumn("id", monotonically_increasing_id()).show() 

Tenga en cuenta que el segundo argumento de df.withColumn es monotonically_increasing_id () no monotonically_increasing_id.

Encontré la siguiente solución relativamente sencilla para el caso en que zipWithIndex () es el comportamiento deseado, es decir, para aquellos que desean enteros consecutivos.

En este caso, estamos usando pyspark y confiando en la comprensión del diccionario para asignar el objeto original de la fila a un nuevo diccionario que se ajusta a un nuevo esquema que incluye el índice único.

 # read the initial dataframe without index dfNoIndex = sqlContext.read.parquet(dataframePath) # Need to zip together with a unique integer # First create a new schema with uuid field appended newSchema = StructType([StructField("uuid", IntegerType(), False)] + dfNoIndex.schema.fields) # zip with the index, map it to a dictionary which includes new field df = dfNoIndex.rdd.zipWithIndex()\ .map(lambda (row, id): {k:v for k, v in row.asDict().items() + [("uuid", id)]})\ .toDF(newSchema)