MySQLではINSERT ... ON DUPLICATE UPDATE
と呼ばれ、標準ではMERGE
操作の一部としてサポートされています。
PostgreSQLでは(pg 9.5以前では)直接サポートされていませんが、どのようにすればよいのでしょうか?以下のように考えてください:
CREATE TABLE testtable (
id integer PRIMARY KEY,
somedata text NOT NULL
);
INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');
タプル(2, 'Joe')
, (3, 'Alan')
をupsert"したいとします:
(1, 'fred'),
(2, 'Joe'), -- Changed value of existing tuple
(3, 'Alan') -- Added new tuple
これが、upsert
について議論するときに人々が話していることである。重要なことは、どのようなアプローチであっても、明示的なロッキングを使用するか、あるいは結果として発生するレースコンディションを防御することで、同じテーブル*上で複数のトランザクションが動作していても安全でなければならないということです。
このトピックは、https://stackoverflow.com/q/1109061/398670 で広範囲に議論されていますが、MySQL 構文の代替案に関するもので、時間の経過とともに無関係な詳細がかなり増えています。確定的な答えに取り組んでいます。
これらのテクニックは、"insert if not exists, otherwise do nothing""、つまり、"insert ... on duplicate key ignore" "にも有効です。
INSERT ... ON CONFLICT UPDATE
(および ON CONFLICT DO NOTHING
)がサポートされています。
ON DUPLICATE KEY UPDATE`との比較。
簡単な説明。
使用法はマニュアル - 特に構文図のconflict_action節と説明テキストを参照してください。
以下に示す9.4以前の解決策とは異なり、この機能は複数の競合する行に対して動作し、排他ロックや再試行ループを必要としません。
この機能を追加したコミットはこちらで、この機能の開発に関する議論はこちらです。PostgreSQLには組み込みのUPSERT
(またはMERGE
)機能がありません。
この記事ではこの問題について詳しく説明しています。
一般的には、2つのオプションから選択しなければならない:
多くの接続が同時に挿入を行おうとする場合、再試行ループで個々の行の挿入を行うのが合理的です。
PostgreSQLのドキュメントには、これをデータベース内のループで実行する便利なプロシジャがあります。ほとんどの単純な解決策とは異なり、更新の失敗や挿入の競合を防ぎます。このプロシージャはREAD COMMITTED
モードでのみ動作し、トランザクション内で行うことがこのプロシージャのみである場合にのみ安全です。この関数は、トリガやセカンダリユニークキーがユニーク違反の原因となる場合、正しく動作しません。
この方法は非常に非効率的です。現実的であればいつでも、作業をキューに入れ、代わりに以下に説明するような一括アップサートを行うべきである。
この問題に対する多くの解決策はロールバックを考慮していないため、不完全な更新になってしまう。2つのトランザクションが互いに競合し、一方はINSERT
に成功し、もう一方は重複キーエラーになり、代わりにUPDATE
を行う。UPDATEは
INSERTがロールバックするかコミットするのを待ってブロックする。そのため、
UPDATE`がコミットしても、実際には期待したアップサートは行われていません。結果の行数をチェックし、必要に応じて再試行する必要がある。
また、SELECTレースを考慮していない解決策もあります。明らかで単純な方法を試すと
-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.
BEGIN;
UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;
-- Remember, this is WRONG. Do NOT COPY IT.
INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);
COMMIT;
を試した場合、2つ同時に実行するといくつかの失敗モードがあります。1つは、すでに説明した更新再チェックの問題です。もう1つは、両方が同時にUPDATE
を実行し、0行にマッチして続行する場合です。このテストはINSERT
の前に行われます。どちらもゼロ行を得たので、どちらもINSERT
を行う。一方は重複キーエラーで失敗する。
これが、再試行ループが必要な理由である。巧妙なSQLを使えば、重複キーエラーや更新の失敗を防げると思うかもしれませんが、そうではありません。行数をチェックするか、重複キーエラーを処理して(選択したアプローチによりますが)再試行する必要があります。
このために独自のソリューションを開発しないでください。メッセージキューイングと同様、それはおそらく間違っています。
新しいデータセットを古い既存のデータセットにマージするような一括アップサートを行いたいことがあります。これは個々の行のアップサートに比べて非常に効率的であり、実用的であればいつでも行うべきである。 この場合、一般的には以下のような処理を行います:
テーブルを
CREATE` する。COPY
または一括挿入する。IN EXCLUSIVE MODE
で LOCK
する。これにより、他のトランザクションは SELECT
することはできるが、テーブルに変更を加えることはできない。UPDATE ...FROM
を実行する;INSERT
を行う;COMMIT
を行い、ロックを解放する。
例えば、質問で与えられた例では、多値の INSERT
を使用して temp テーブルにデータを入力します:BEGIN;
CREATE TEMPORARY TABLE newvals(id integer, somedata text);
INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');
LOCK TABLE testtable IN EXCLUSIVE MODE;
UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;
INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;
COMMIT;
MERGE
](http://wiki.postgresql.org/wiki/SQL_MERGE)SQL 標準の MERGE
は並行性の定義が不十分で、テーブルをロックせずに UPSERT を行うには適していません。
MERGEはOLAPのデータマージには非常に便利な文ですが、並行処理で安全なアップサートを行うには役に立ちません。他のDBMSを使用している人に、アップサートに
MERGE` を使用するようにというアドバイスがたくさんありますが、実はそれは間違っています。
INSERT ... ON DUPLICATE KEY UPDATE
](http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html)MERGE
](http://technet.microsoft.com/en-us/library/bb510625.aspx) (ただし MERGE
の問題については上記を参照)。MERGE
from Oracle (ただし MERGE
の問題については上記を参照)PostgreSQLの9.5以前のバージョンでの単一挿入の問題に対する別の解決策を提供しようとしています。そのアイデアは、まず挿入を行い、レコードが既に存在する場合は更新を行うというものです:
do $$
begin
insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
update testtable set somedata = 'Joe' where id = 2;
end $$;
この解決策は、テーブルの行の削除がない場合のみ適用できることに注意してください。
この解決策の効率については知りませんが、十分に合理的だと思われます。
insertの例をいくつか示します。 ... 紛争について。 ...
( pg 9.5 + ):
-紛争時に挿入-何もしない。
ダミー(id、name、size)値(1、 'new_name'、3)に挿入します。 紛争については何もしません。
。
-競合時に挿入-更新を行い、列で競合ターゲットを指定します。
ダミー(id、name、size)値(1、 'new_name'、3)に挿入します。 紛争(id)について。 設定名= 'new_name'を更新し、サイズ= 3;
を更新します。
-競合時に挿入-更新を行い、制約名を介して競合ターゲットを指定します。
ダミー(id、name、size)値(1、 'new_name'、3)に挿入します。 制約dummy_pkeyの競合について。 設定名= 'new_name'を更新し、サイズ= 4;
を更新します。
上記の大きな投稿はPostgresバージョンの多くの異なるSQLアプローチをカバーしているため(質問のように9.5以外)、Postgres 9.5を使用している場合は、SQLAlchemyでそれを行う方法を追加したいと思います。 独自のupsertを実装する代わりに、SQLAlchemyの機能(SQLAlchemy 1.1で追加された)を使用することもできます。 個人的には、可能であればこれらを使用することをお勧めします。 利便性のためだけでなく、PostgreSQLが発生する可能性のあるレース条件を処理できるためです。
昨日私が与えた別の回答からのクロス投稿(https://stackoverflow.com/a/44395983/2156909)。
SQLAlchemyは、 on_conflict_do_update()
と on_conflict_do_nothing()
の2つのメソッドで ON CONFLICT
をサポートしています。
ドキュメントからのコピー:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight = conflict#insert-on-conflict-upsert。
この質問が閉じられたので、SQLAlchemyを使用してどのように実行するかについてここに投稿します。 再帰を介して、レース条件と検証エラーに対抗するために、バルクインサートまたは更新を再試行します。
まず輸入。
import itertools as it
from functools import partial
from operator import itemgetter
from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts
これで、いくつかのヘルパー機能が機能します。
def chunk(content, chunksize=None):
"""Groups data into chunks each with (at most) `chunksize` items.
https://stackoverflow.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])
return it.takewhile(bool, generator)
def gen_resources(records):
"""Yields a dictionary if the record's id already exists, a row object
otherwise.
"""
ids = {item[0] for item in session.query(Posts.id)}
for record in records:
is_row = hasattr(record, 'to_dict')
if is_row and record.id in ids:
# It's a row but the id already exists, so we need to convert it
# to a dict that updates the existing record. Since it is duplicate,
# also yield True
yield record.to_dict(), True
elif is_row:
# It's a row and the id doesn't exist, so no conversion needed.
# Since it's not a duplicate, also yield False
yield record, False
elif record['id'] in ids:
# It's a dict and the id already exists, so no conversion needed.
# Since it is duplicate, also yield True
yield record, True
else:
# It's a dict and the id doesn't exist, so we need to convert it.
# Since it's not a duplicate, also yield False
yield Posts(**record), False
そして最後にupsert関数。
def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))
for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]
if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all
try:
_upsert(items)
session.commit()
except IntegrityError:
# A record was added or deleted after we checked, so retry
#
# modify accordingly by adding additional exceptions, e.g.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Some other error occurred so reduce chunksize to isolate the
# offending row(s)
db.session.rollback()
num_items = len(items)
if num_items > 1:
upsert(items, num_items // 2)
else:
print('Error adding record {}'.format(items[0]))
使用方法は次のとおりです。
>>> data = [
... {'id': 1, 'text': 'updated post1'},
... {'id': 5, 'text': 'updated post5'},
... {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)
これが[bulk_save_objects
][3]を超える利点は、挿入時に関係、エラーチェックなどを処理できることです(バルク操作とは異なります)。
[3]:http://docs.sqlalchemy.org/en/latest/orm/session_api.html?highlight = bulk_save_objects#sqlalchemy.orm.session.Session.bulk_save_objects。