最近luigiというワークフローフレームワークで Python のバッチ処理を書いている。
この luigi の特徴として複数のタスクで依存関係を持たせることができ、以下のような流れで依存関係を解決する。
- タスクは完了すると
output
としてファイルを出力する。 - タスクは起動の条件となる前提タスク(
requires
)がある場合、前提タスクが先に走る。 - 前提タスクの
output
が既に存在する場合は、前提タスクが走らずにタスクが起動する。
つまりoutput
のファイルでタスクの動きを制御しているのだ。
今回やりたかったこと
今回やりたかったことは以下 3 点
- ターゲットフォルダからファイルを 1 個自動で選ぶ
- バックアップフォルダへコピーする
- コピー先のパスを DB に登録する
これだけ、内容自体はよくある処理で、個別に実装する分にはイージーだ。
それぞれをPickup
,FileCopy
,Archive
という名前でタスクを作成する。
- ターゲットフォルダからファイルを 1 個自動で選ぶ ->
Pickup
- バックアップフォルダへコピーする ->
FileCopy
- コピー先のパスを DB に登録する ->
Archive
処理の流れはPickup
->FileCopy
->Archive
なんだけど、依存関係を遡って処理していくので最初に動かすタスクはArchive
になる。
こんな感じで流れる
Archive
のoutput
を確認、ファイルがあれば何もせず終了、無ければ次へArchive
のrequires
を確認、あれば依存タスク(FileCopy
)へFileCopy
のoutput
を確認、ファイルがあればArchive
だけrun
して終了、無ければ次へFileCopy
のrequires
を確認、あれば依存タスク(Pickup
)へPickup
のoutput
を確認、ファイルがあればFileCopy
->Archive
の順でrun
して終了、無ければ次へPickup
のrequires
を確認、依存タスクが無ければPickup
->FileCopy
->Archive
の順でrun
して終了
コードはこんな感じになるだろう
#!/usr/bin/env python |
output をどうするか
上記のコードでoutput
で出力するファイルをどうするか考える
Archive
とPickup
のようにファイル名が決まっている場合(requires
で呼ばれたタイミングでは決まっている場合)は問題ないが、FileCopy
のように動的にファイルを決めたい場合(requires
時点ではまだファイル名が決定できない場合)は途端に難しくなる。
上記のコードではArchive
から依存タスクを遡っていって、実際はPickup
が最初にrun
される。この時に処理するファイルが自動でピックアップされる(っていう仕様)ので、ここで初めてコピー・DB 登録されるファイルが決まるのである。
しかし luigi では依存タスクの解決をする時点でファイル名は定まっていなければならない。Archive
がFileCopy
に依存しているとチェックする一番最初の箇所で、FileCopy
のoutput
は何というファイルなのかを求められてしまうのだ。
処理されるファイル名が動的に決定するというパターンは決してレアケースではなく、むしろありふれた処理だと思う。しかし luigi ではタスクの依存性を解決していく中でファイル名を動的に取得するという動きができない。
依存性を解決する為にファイル名が必要、ファイル名を決める為には依存性の解決が必要、ルイージのジレンマだ。
ではどうするの?
そもそもrequires
とoutput
で連鎖的に依存関係を解決していく方法では、output
の出力ファイルはrequires
時点で決定していなければならない」という制約があることを理解することだ。
今回のように動的にoutput
を定めたい場合はrequires
を用いらずに動的に依存関係を解決する方法が luigi から提供されている。
Dynamic dependencies を使う
Dynamic dependencies(動的依存関係)を使うとrun
した際に動的に依存関係を解決できる。
このサンプルコードも併せて確認するとよい
書き換えたコードが以下のものだ
#!/usr/bin/env python |
requires
が全てのタスクから消えて、依存関係を示す処理がArchive
のrun
に取り込まれていることがわかる。
このタスクを実施すると以下の流れで実行される。
Archive
のoutput
を確認、ファイルがあれば何もせず終了、無ければ次へArchive
のrun
を実行Pickup
タスクを作成Pickup
のoutput
を確認、ファイルが無ければrun
を実行(yield pickup
部分)Pickup
のoutput
を引数として、FileCopy
タスクを作成FileCopy
のoutput
を確認、ファイルが無ければrun
を実行(yield filecopy
部分)FileCopy
のoutput
を引数として、ファイルアーカイブ処理を実施Archive
のoutput
を出力して終了
Archive
のrun
がちょっとごちゃっとなってしまったけど、目的は達成できた!
ただArchive
からみた依存関係はPickup
もFileCopy
も同じレベルに変わったことは注意したい。Pickup
とFileCopy
の間に依存関係はないのだ。
だからFileCopy
のoutput
が存在していても、無関係にPickup
は毎回実行される。
まあ、FileCopy
は動的に処理したいのだからPickup
に依存してなくていい、望んでいた形ではある。
残課題
Archive
は DB 登録する処理だから、output
はMySqlTargetにしたほうがいいPickup
は毎回無条件に実施するのであればタスクでなくてよかったかも?以下のコードはその場で処理してしまっていいかもしれない
pickup = Pickup() |
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2