luigi のタスクのoutputを DB 登録とする時、前回までではSQLAlchemyとの連携を使用した。
その中でタスクのcomplete判定は以下のようになっている。
complete判定用にtable_updatesテーブルを DB 上に持つ。このテーブルは最初に参照された時に自動生成される。- 判定は
table_updatesテーブルにupdate_idをキーとしたレコードの有無で判断する。あれば実行済みだ。 update_idはデフォルトでは<タスクネームスペース>.<タスク名>_<パラメータ(の一部)>_<パラメータのmd5ハッシュ>をキーとして使用する。completeがFalseの場合、データを対象テーブルに Insert する。
ただしこのデフォルト設定にはいくつか問題点がある。
update_idが固定
固定というわけではないが、タスクとパラメータが同一ならcompleteはTrueを返してしまい、runが走らずに依存元タスクへ処理が戻ってしまう。
これの対応は簡単で、update_idをオーバーライドして自分で定義してしまえばいい。前回の例ならば「ファイルパス」をキーとしてしまえばよい。
def update_id(self): |
Insert が失敗する可能性がある
update_idは実際に登録するデータのキーと紐づいているわけではないので、completeがFalseになったとしてもデータ登録がDuplicate Key Errorになる可能性がある。ここはupdate_idをプライマリーキーの値にする(もしくはユニークとなる値)よう自力で設定するしかない。
Insert ではなく Update したい
今回の本題がこれ
データを Insert してDuplicate Key Errorになる時は、そもそも Insert ではなく Update したいという場合だ。(いわゆるUpsert)
MySQL のON DUPLICATE KEY UPDATE機能を SQLAlchemy から呼び出して対応してみたい。
sqla.CopyToTable のソースコードを確認すると、Insert 処理をしているのはcopyメソッドであることがわかる。このメソッドのコメントに以下のようなことが書いてある。
A task that needs row updates instead of insertions should overload this method.
Update にしたければオーバーロードしろと
オーバーライドじゃないのか…?と思いつつ実装してみる
オーバーライド後
前回のコードに適用するとこんな感じだ
from sqlalchemy.dialects.mysql import insert |
オーバーライド前はこのようなコードだったので、一行ずつ比較して見てみたい。
# luigi/contrib/sqla.py |
1 行目
bound_cols = dict((c, sqlalchemy.bindparam("_" + c.key)) for c in table_bound.columns) |
投入されるデータ(ins_rows)はカラム名に_がついているので、そのままではカラム名不一致でテーブルに投入できない。その為、カラム名の対応表として上記の dict を作成している。
この行は特にいじらず、このままにしておく。
2 行目
ins = table_bound.insert().values(bound_cols) |
valuesにカラム名の対応表(1 行目で作成した dict)を設定することで、パラメータを更新している。しかしこのままでは MySQL に対応していないのでsqlalchemy.dialects.mysqlのinsertを利用して再定義する。
from sqlalchemy.dialects.mysql import insert |
するとon_duplicate_key_updateメソッドが使用できるので、Duplicate Keyの時に Update をするカラムを設定することができる。
3 行目
conn.execute(ins, ins_rows) |
ここは単純に SQL を流している。以下のように書き換えてあげよう。
conn.execute(on_duplicate_key, ins_rows) |
以上でUpsertを実装することができた。
いままでUpsertって使ったことがなかったけど、これって luigi だけじゃなくて普通の SQLAlchemy でも使えるね
MySQL 以外にも、PostgreSQL や SQLite でもできるらしいが、今回はここまで
参考
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2
- SQLAlchemy 1.2.1