前回の残課題
Archiveは DB 登録する処理だから、outputはMySqlTargetにしたほうがいい
どうやってoutputを DB 登録にするか、普段SQLAlchemyを使っているのでSQLAlchemyとの連携をやりたいと思う。
前回のコード
前回のコードは以下のものだ、outputがファイル出力になっているのでこれを DB 登録に対応させていく
import luigi |
参考にしたのは以下
データベースの準備
SQLAlchemy Migrateを使ってMySQLのデータベースを準備する。
テーブル定義はこんな感じ
from sqlalchemy import * |
Dynamic dependencies
ベーシックなサンプルは以下の通りだ(テーブルは既にあるのでreflect = Trueのパターン)
class SQLATask(sqla.CopyToTable): |
しかしこのサンプルをそのまま適用することはできない…動的依存関係の解決の為にrun()を定義する必要があるからだ。
なので、ここではデータを登録するタスクとデータを準備するタスクを分けるサンプルが参考になる。
データを準備するタスクとしてBuildRecordタスクを定義し、動的依存関係の解決はそのタスクのrun()で行うのだ。
import luigi |
これで DB 登録できるようなった!
基本的な流れは今まで通り、Archiveタスクを実行するとrequiresでBuildRecordを呼び、BuildRecordのrunでデータを作成、Archiveに戻って SQLAlchemy で DB 登録という形だ。
細かな注意点は以下の通りだ
- 登録するデータは
\tで区切った文字列で渡す。今回は 1 行のみだが、複数行渡す場合は\nで改行する。 - 区切り文字
\tはcolumn_separatorオプションで変更可能 idはAUTO INCREMENTだが、登録データを空白にするとカラムがずれるので0を指定してあげる。そうすると Insert する時にちゃんとAUTO INCREMENTしてくれる。
sqla.CopyToTableのoutput
Archiveタスクはoutputとしてレコードを DB に登録することができたが、このレコードを削除したら次にタスクを回した時再度登録されるだろうか?
実際やってみると再登録はされない。それどころかArchive処理自体が走らなくなる。
sqla.CopyToTableのcompleteをどうやって判定しているかという話になるが、update_idというメソッドで判定しており、内部でtask_idを参照している。task_idはタスクを走らせた時のログに出てくるArchive__fbe666d9feのような値で、<タスク>_<パラメータ>_<パラメータのmd5ハッシュ>(正確じゃないかもしれないけどこんな感じ)というフォーマットになっている。
つまりタスクとパラメータが同一ならキーが変わらないのでsqla.CopyToTableは走らない。
そしてこの情報はtable_updatesというテーブルが自動的に作成されて保存されている。
今回の要件だとタスクの ID よりもレコードの内容でcompleteを判定して欲しい。具体的にはユニークであるfilenameのカラムだ。
しかしArchiveタスクでupdate_idメソッドをオーバーライドしようとしても、completeを判定するタイミングでfilenameはわからない。ジレンマの再来だ。
残課題
update_idをオーバーライドして、レコード内容をもとにcomplete判定をするようにしたい。現状ではパラメータを渡さなければ一回限りの実行しかしないし、何かしらパラメータを渡しても無条件でrunが走る形になってしまう。
余談
サンプルでもMockTarget使ってる…MockTargetはユニットテスト用途と言われているけど、やっぱりoutputがファイルである必要性が無いタスクはMockTarget使ったほうがスマートだよね
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2
- SQLAlchemy 1.2.1
- sqlalchemy-migrate 0.11.0