¿Cómo UPSERT (FUSIÓN, INSERTAR … EN ACTUALIZACIÓN DUPLICADA) en PostgreSQL?

Una pregunta muy frecuente aquí es cómo hacer un upsert, que es lo que MySQL llama INSERT ... ON DUPLICATE UPDATE y los soportes estándar como parte de la operación MERGE .

Dado que PostgreSQL no lo admite directamente (antes de la página 9.5), ¿cómo lo hace? Considera lo siguiente:

 CREATE TABLE testtable ( id integer PRIMARY KEY, somedata text NOT NULL ); INSERT INTO testtable (id, somedata) VALUES (1, 'fred'), (2, 'bob'); 

Ahora imagina que quieres “restablecer” las tuplas (2, 'Joe') , (3, 'Alan') , así que los nuevos contenidos de la tabla serían:

 (1, 'fred'), (2, 'Joe'), -- Changed value of existing tuple (3, 'Alan') -- Added new tuple 

De eso es de lo que habla la gente cuando habla de un upsert . Fundamentalmente, cualquier enfoque debe ser seguro en la presencia de múltiples transacciones que trabajan en la misma tabla , ya sea mediante el locking explícito, o de otra manera defenderse contra las condiciones de carrera resultantes.

Este tema se discute extensamente en Insertar, en la actualización duplicada en PostgreSQL? , pero eso se trata de alternativas a la syntax de MySQL, y ha crecido un poco de detalles no relacionados con el tiempo. Estoy trabajando en respuestas definitivas.

Estas técnicas también son útiles para “insertar si no existe, de lo contrario no hacer nada”, es decir, “insertar … en la clave duplicada ignorar”.

9.5 y más nuevo:

PostgreSQL 9.5 y soporte más nuevo INSERT ... ON CONFLICT UPDATE (y ON CONFLICT DO NOTHING ), es decir, insertar.

Comparación con ON DUPLICATE KEY UPDATE .

Explicación rápida

Para su uso, consulte el manual , específicamente la cláusula conflict_action en el diagtwig de syntax y el texto explicativo .

A diferencia de las soluciones para 9.4 y anteriores que se proporcionan a continuación, esta característica funciona con múltiples filas en conflicto y no requiere un locking exclusivo o un ciclo de rebash.

El compromiso de agregar la función está aquí y la discusión sobre su desarrollo está aquí .


Si tiene 9.5 y no necesita compatibilidad con versiones anteriores, puede dejar de leer ahora .


9.4 y más viejo:

PostgreSQL no tiene ninguna función incorporada UPSERT (o MERGE ), y hacerlo de manera eficiente ante el uso simultáneo es muy difícil.

Este artículo analiza el problema con detalles útiles .

En general, debe elegir entre dos opciones:

  • Operaciones individuales de inserción / actualización en un ciclo de rebash; o
  • Bloquear la mesa y fusionar lotes

Bucle de rebash de fila individual

El uso de descripciones de filas individuales en un ciclo de rebash es la opción razonable si desea que muchas conexiones intenten realizar inserciones al mismo tiempo.

La documentación de PostgreSQL contiene un procedimiento útil que te permitirá hacer esto en un bucle dentro de la base de datos . Protege contra actualizaciones perdidas e inserta razas, a diferencia de la mayoría de las soluciones ingenuas. Solo funcionará en modo READ COMMITTED y solo es seguro si es lo único que haces en la transacción. La función no funcionará correctamente si los disparadores o las claves únicas secundarias causan violaciones únicas.

Esta estrategia es muy ineficiente. Siempre que sea práctico, debe poner en cola de trabajo y hacer una inserción masiva como se describe a continuación en su lugar.

Muchos bashs de soluciones a este problema no consideran las reversiones, por lo que resultan en actualizaciones incompletas. Dos transacciones compiten entre sí; uno de ellos INSERT éxito s; el otro obtiene un error clave duplicado y realiza una UPDATE lugar. Los bloques UPDATE esperan a que INSERT repliegue o confirme. Cuando retrocede, la verificación de la condición de UPDATE coincide con cero filas, por lo tanto, aunque la UPDATE se haya confirmado, no ha realizado la actualización que esperaba. Debe verificar los recuentos de fila de resultados y volver a intentar donde sea necesario.

Algunas soluciones intentadas tampoco consideran las carreras SELECT. Si prueba lo obvio y simple:

 -- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE. BEGIN; UPDATE testtable SET somedata = 'blah' WHERE id = 2; -- Remember, this is WRONG. Do NOT COPY IT. INSERT INTO testtable (id, somedata) SELECT 2, 'blah' WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2); COMMIT; 

luego, cuando dos corren a la vez, hay varios modos de falla. Uno es el problema ya discutido con una nueva verificación de actualización. Otro es donde ambos UPDATE al mismo tiempo, igualando cero filas y continuando. Luego ambos hacen la prueba EXISTS , que ocurre antes del INSERT . Ambos obtienen cero filas, por lo que ambos hacen el INSERT . Uno falla con un error clave duplicado.

Es por eso que necesita un ciclo de reintentar. Puede pensar que puede evitar errores clave duplicados o pérdidas de actualizaciones con SQL inteligente, pero no puede. Debe verificar los recuentos de filas o manejar los errores duplicados de las claves (según el enfoque elegido) y volver a intentarlo.

Por favor, no lance su propia solución para esto. Al igual que con la cola de mensajes, es probable que esté mal.

Postre a granel con cerradura

Algunas veces desea hacer un almacenamiento masivo, donde tiene un nuevo conjunto de datos que desea fusionar en un conjunto de datos existente anterior. Esto es mucho más eficiente que los envíos de fila individuales y debe preferirse siempre que sea práctico.

En este caso, generalmente sigue el siguiente proceso:

  • CREATE una tabla TEMPORARY

  • COPY o bulk-inserta los nuevos datos en la tabla temporal

  • LOCK la tabla de destino IN EXCLUSIVE MODE . Esto permite a otras transacciones SELECT , pero no hacer ningún cambio en la tabla.

  • Realice una UPDATE ... FROM registros existentes utilizando los valores en la tabla temporal;

  • Realice un INSERT de filas que aún no existen en la tabla de destino;

  • COMMIT , liberando el locking.

Por ejemplo, para el ejemplo dado en la pregunta, usando INSERT para completar la tabla temporal:

 BEGIN; CREATE TEMPORARY TABLE newvals(id integer, somedata text); INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan'); LOCK TABLE testtable IN EXCLUSIVE MODE; UPDATE testtable SET somedata = newvals.somedata FROM newvals WHERE newvals.id = testtable.id; INSERT INTO testtable SELECT newvals.id, newvals.somedata FROM newvals LEFT OUTER JOIN testtable ON (testtable.id = newvals.id) WHERE testtable.id IS NULL; COMMIT; 

Lectura relacionada

  • Página wiki de UPSERT
  • UPSERTisms en Postgres
  • Insertar, en la actualización duplicada en PostgreSQL?
  • http://petereisentraut.blogspot.com/2010/05/merge-syntax.html
  • Upsert con una transacción
  • ¿SELECCIONAR o INSERTAR en una función es propenso a las condiciones de carrera?
  • SQL MERGE en la wiki de PostgreSQL
  • La manera más idiomática de implementar UPSERT en Postgresql hoy en día

¿Qué hay de MERGE ?

El MERGE estándar de SQL realmente tiene una semántica de concurrencia mal definida y no es adecuado para la inserción sin bloquear primero una tabla.

Es una statement OLAP realmente útil para la fusión de datos, pero en realidad no es una solución útil para el upsert-safe upsert. Hay muchos consejos para las personas que usan otros DBMS para usar MERGE para los postres, pero en realidad es incorrecto.

Otros DBs:

  • INSERT ... ON DUPLICATE KEY UPDATE en MySQL
  • MERGE de MS SQL Server (pero vea más arriba sobre los problemas de MERGE )
  • MERGE de Oracle (pero vea arriba sobre los problemas de MERGE )

Estoy tratando de contribuir con otra solución para el problema de inserción única con las versiones anteriores a 9.5 de PostgreSQL. La idea es simplemente intentar realizar primero la inserción y, en caso de que el registro ya esté presente, actualizarla:

 do $$ begin insert into testtable(id, somedata) values(2,'Joe'); exception when unique_violation then update testtable set somedata = 'Joe' where id = 2; end $$; 

Tenga en cuenta que esta solución solo se puede aplicar si no hay eliminaciones de filas de la tabla .

No sé acerca de la eficiencia de esta solución, pero me parece lo suficientemente razonable.

Aquí hay algunos ejemplos para insert ... on conflict ... ( pg 9.5+ ):

  • Insertar, en conflicto – no hacer nada .
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict do nothing;

  • Insertar, en conflicto: actualizar , especificar objective de conflicto por columna .
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict(id) do update set name = 'new_name', size = 3;

  • Insertar, en caso de conflicto: actualice , especifique el destino del conflicto a través del nombre de la restricción .
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;

 WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 RETURNING ID), INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD)) INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS 

Probado en Postgresql 9.3

SQLAlchemy upsert para Postgres> = 9.5

Como la publicación anterior cubre muchos enfoques SQL diferentes para las versiones de Postgres (no solo no 9.5 como en la pregunta), me gustaría agregar cómo hacerlo en SQLAlchemy si está utilizando Postgres 9.5. En lugar de implementar su propio upsert, también puede usar las funciones de SQLAlchemy (que se agregaron en SQLAlchemy 1.1). Personalmente, recomendaría usar estos, si es posible. No solo por conveniencia, sino también porque permite a PostgreSQL manejar cualquier condición de carrera que pueda ocurrir.

Publicación cruzada de otra respuesta que di ayer ( https://stackoverflow.com/a/44395983/2156909 )

SQLAlchemy es compatible con ON CONFLICT ahora con dos métodos on_conflict_do_update() y on_conflict_do_nothing() :

Copia de la documentación:

 from sqlalchemy.dialects.postgresql import insert stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') stmt = stmt.on_conflict_do_update( index_elements=[my_table.c.user_email], index_where=my_table.c.user_email.like('%@gmail.com'), set_=dict(data=stmt.excluded.data) ) conn.execute(stmt) 

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

Dado que esta pregunta se cerró, estoy publicando aquí cómo lo hace con SQLAlchemy. A través de la recursión, reintenta una inserción o actualización masiva para combatir las condiciones de carrera y los errores de validación.

Primero las importaciones

 import itertools as it from functools import partial from operator import itemgetter from sqlalchemy.exc import IntegrityError from app import session from models import Posts 

Ahora un par de funciones auxiliares

 def chunk(content, chunksize=None): """Groups data into chunks each with (at most) `chunksize` items. https://stackoverflow.com/a/22919323/408556 """ if chunksize: i = iter(content) generator = (list(it.islice(i, chunksize)) for _ in it.count()) else: generator = iter([content]) return it.takewhile(bool, generator) def gen_resources(records): """Yields a dictionary if the record's id already exists, a row object otherwise. """ ids = {item[0] for item in session.query(Posts.id)} for record in records: is_row = hasattr(record, 'to_dict') if is_row and record.id in ids: # It's a row but the id already exists, so we need to convert it # to a dict that updates the existing record. Since it is duplicate, # also yield True yield record.to_dict(), True elif is_row: # It's a row and the id doesn't exist, so no conversion needed. # Since it's not a duplicate, also yield False yield record, False elif record['id'] in ids: # It's a dict and the id already exists, so no conversion needed. # Since it is duplicate, also yield True yield record, True else: # It's a dict and the id doesn't exist, so we need to convert it. # Since it's not a duplicate, also yield False yield Posts(**record), False 

Y finalmente la función upsert

 def upsert(data, chunksize=None): for records in chunk(data, chunksize): resources = gen_resources(records) sorted_resources = sorted(resources, key=itemgetter(1)) for dupe, group in it.groupby(sorted_resources, itemgetter(1)): items = [g[0] for g in group] if dupe: _upsert = partial(session.bulk_update_mappings, Posts) else: _upsert = session.add_all try: _upsert(items) session.commit() except IntegrityError: # A record was added or deleted after we checked, so retry # # modify accordingly by adding additional exceptions, eg, # except (IntegrityError, ValidationError, ValueError) db.session.rollback() upsert(items) except Exception as e: # Some other error occurred so reduce chunksize to isolate the # offending row(s) db.session.rollback() num_items = len(items) if num_items > 1: upsert(items, num_items // 2) else: print('Error adding record {}'.format(items[0])) 

Así es como lo usas

 >>> data = [ ... {'id': 1, 'text': 'updated post1'}, ... {'id': 5, 'text': 'updated post5'}, ... {'id': 1000, 'text': 'new post1000'}] ... >>> upsert(data) 

La ventaja que esto tiene sobre bulk_save_objects es que puede manejar relaciones, comprobación de errores, etc. al insertar (a diferencia de las operaciones masivas ).

    Intereting Posts