前回の残課題
Archive
は DB 登録する処理だから、output
はMySqlTargetにしたほうがいい
どうやってoutput
を DB 登録にするか、普段SQLAlchemy
を使っているのでSQLAlchemy
との連携をやりたいと思う。
前回のコード
前回のコードは以下のものだ、output
がファイル出力になっているのでこれを DB 登録に対応させていく
import luigi |
参考にしたのは以下
データベースの準備
SQLAlchemy Migrateを使ってMySQL
のデータベースを準備する。
テーブル定義はこんな感じ
from sqlalchemy import * |
Dynamic dependencies
ベーシックなサンプルは以下の通りだ(テーブルは既にあるのでreflect = True
のパターン)
class SQLATask(sqla.CopyToTable): |
しかしこのサンプルをそのまま適用することはできない…動的依存関係の解決の為にrun()
を定義する必要があるからだ。
なので、ここではデータを登録するタスクとデータを準備するタスクを分けるサンプルが参考になる。
データを準備するタスクとしてBuildRecord
タスクを定義し、動的依存関係の解決はそのタスクのrun()
で行うのだ。
import luigi |
これで DB 登録できるようなった!
基本的な流れは今まで通り、Archive
タスクを実行するとrequires
でBuildRecord
を呼び、BuildRecord
のrun
でデータを作成、Archive
に戻って SQLAlchemy で DB 登録という形だ。
細かな注意点は以下の通りだ
- 登録するデータは
\t
で区切った文字列で渡す。今回は 1 行のみだが、複数行渡す場合は\n
で改行する。 - 区切り文字
\t
はcolumn_separator
オプションで変更可能 id
はAUTO INCREMENTだが、登録データを空白にするとカラムがずれるので0
を指定してあげる。そうすると Insert する時にちゃんとAUTO INCREMENTしてくれる。
sqla.CopyToTable
のoutput
Archive
タスクはoutput
としてレコードを DB に登録することができたが、このレコードを削除したら次にタスクを回した時再度登録されるだろうか?
実際やってみると再登録はされない。それどころかArchive
処理自体が走らなくなる。
sqla.CopyToTable
のcomplete
をどうやって判定しているかという話になるが、update_id
というメソッドで判定しており、内部でtask_id
を参照している。task_id
はタスクを走らせた時のログに出てくるArchive__fbe666d9fe
のような値で、<タスク>_<パラメータ>_<パラメータのmd5ハッシュ>
(正確じゃないかもしれないけどこんな感じ)というフォーマットになっている。
つまりタスクとパラメータが同一ならキーが変わらないのでsqla.CopyToTable
は走らない。
そしてこの情報はtable_updates
というテーブルが自動的に作成されて保存されている。
今回の要件だとタスクの ID よりもレコードの内容でcomplete
を判定して欲しい。具体的にはユニークであるfilename
のカラムだ。
しかしArchive
タスクでupdate_id
メソッドをオーバーライドしようとしても、complete
を判定するタイミングでfilename
はわからない。ジレンマの再来だ。
残課題
update_id
をオーバーライドして、レコード内容をもとにcomplete
判定をするようにしたい。現状ではパラメータを渡さなければ一回限りの実行しかしないし、何かしらパラメータを渡しても無条件でrun
が走る形になってしまう。
余談
サンプルでもMockTarget
使ってる…MockTarget
はユニットテスト用途と言われているけど、やっぱりoutput
がファイルである必要性が無いタスクはMockTarget
使ったほうがスマートだよね
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2
- SQLAlchemy 1.2.1
- sqlalchemy-migrate 0.11.0