Cuadros de datos Spark UPSERT a tabla Postgres

Estoy usando Apache Spark DataFrames para unir dos fonts de datos y obtener el resultado como otro DataFrame. Quiero escribir el resultado en otra tabla de Postgres. Veo esta opción:

myDataFrame.write.jdbc(url, table, connectionProperties) 

Pero, lo que quiero hacer es UPSERT el dataframe en la tabla basada en la clave principal de la tabla. ¿Cómo se hace esto? Estoy usando Spark 1.6.0.

No es compatible. DataFrameWriter puede agregar o sobrescribir la tabla existente. Si su aplicación requiere una lógica más compleja, tendrá que lidiar con esto de forma manual.

Una opción es usar una acción ( foreach , foreachPartition ) con conexión JDBC estándar. Otra es escribir en un temporal y manejar el rest directamente en la base de datos.

KrisP tiene el derecho de eso. La mejor manera de hacer un postre no es a través de una statement preparada. Es importante tener en cuenta que este método se insertará uno a la vez con tantas particiones como la cantidad de trabajadores que tenga. Si quieres hacer esto en lote, también puedes

 import java.sql._ dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch => val dbc: Connection = DriverManager.getConnection("JDBCURL") val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT") batch.grouped("# Of Rows you want per batch").foreach { session => session.foreach { x => st.setDouble(1, x.getDouble(1)) st.addBatch() } st.executeBatch() } dbc.close() } 

Esto ejecutará lotes para cada trabajador y cerrará la conexión DB. Le da control sobre cuántos trabajadores, cuántos lotes y le permite trabajar dentro de esos límites.

Si va a hacerlo de forma manual y mediante la opción 1 mencionada por zero323, debería echar un vistazo al código fuente de Spark para la instrucción de inserción aquí

  def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { val columns = rddSchema.fields.map(_.name).mkString(",") val placeholders = rddSchema.fields.map(_ => "?").mkString(",") val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)" conn.prepareStatement(sql) } 

El PreparedStatement es parte de java.sql y tiene métodos como execute() y executeUpdate() . Todavía tiene que modificar el sql consecuencia, por supuesto.

Para insertar JDBC, puede usar

dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)

Además, Dataframe.write le proporciona un DataFrameWriter y tiene algunos métodos para insertar el dataframe.

def insertInto(tableName: String): Unit

Inserta el contenido del DataFrame en la tabla especificada. Requiere que el esquema del DataFrame sea el mismo que el esquema de la tabla.

Como inserta datos en una tabla existente, se ignorará el formato o las opciones.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

Aún no hay nada que actualice los registros individuales desde la chispa

    Intereting Posts