最近luigiというワークフローフレームワークで Python のバッチ処理を書いている。
この luigi の特徴として複数のタスクで依存関係を持たせることができ、以下のような流れで依存関係を解決する。
- タスクは完了すると
outputとしてファイルを出力する。 - タスクは起動の条件となる前提タスク(
requires)がある場合、前提タスクが先に走る。 - 前提タスクの
outputが既に存在する場合は、前提タスクが走らずにタスクが起動する。
つまりoutputのファイルでタスクの動きを制御しているのだ。
今回やりたかったこと
今回やりたかったことは以下 3 点
- ターゲットフォルダからファイルを 1 個自動で選ぶ
- バックアップフォルダへコピーする
- コピー先のパスを DB に登録する
これだけ、内容自体はよくある処理で、個別に実装する分にはイージーだ。
それぞれをPickup,FileCopy,Archiveという名前でタスクを作成する。
- ターゲットフォルダからファイルを 1 個自動で選ぶ ->
Pickup - バックアップフォルダへコピーする ->
FileCopy - コピー先のパスを DB に登録する ->
Archive
処理の流れはPickup->FileCopy->Archiveなんだけど、依存関係を遡って処理していくので最初に動かすタスクはArchiveになる。
こんな感じで流れる
Archiveのoutputを確認、ファイルがあれば何もせず終了、無ければ次へArchiveのrequiresを確認、あれば依存タスク(FileCopy)へFileCopyのoutputを確認、ファイルがあればArchiveだけrunして終了、無ければ次へFileCopyのrequiresを確認、あれば依存タスク(Pickup)へPickupのoutputを確認、ファイルがあればFileCopy->Archiveの順でrunして終了、無ければ次へPickupのrequiresを確認、依存タスクが無ければPickup->FileCopy->Archiveの順でrunして終了
コードはこんな感じになるだろう
#!/usr/bin/env python |
output をどうするか
上記のコードでoutputで出力するファイルをどうするか考える
ArchiveとPickupのようにファイル名が決まっている場合(requiresで呼ばれたタイミングでは決まっている場合)は問題ないが、FileCopyのように動的にファイルを決めたい場合(requires時点ではまだファイル名が決定できない場合)は途端に難しくなる。
上記のコードではArchiveから依存タスクを遡っていって、実際はPickupが最初にrunされる。この時に処理するファイルが自動でピックアップされる(っていう仕様)ので、ここで初めてコピー・DB 登録されるファイルが決まるのである。
しかし luigi では依存タスクの解決をする時点でファイル名は定まっていなければならない。ArchiveがFileCopyに依存しているとチェックする一番最初の箇所で、FileCopyのoutputは何というファイルなのかを求められてしまうのだ。
処理されるファイル名が動的に決定するというパターンは決してレアケースではなく、むしろありふれた処理だと思う。しかし luigi ではタスクの依存性を解決していく中でファイル名を動的に取得するという動きができない。
依存性を解決する為にファイル名が必要、ファイル名を決める為には依存性の解決が必要、ルイージのジレンマだ。
ではどうするの?
そもそもrequiresとoutputで連鎖的に依存関係を解決していく方法では、outputの出力ファイルはrequires時点で決定していなければならない」という制約があることを理解することだ。
今回のように動的にoutputを定めたい場合はrequiresを用いらずに動的に依存関係を解決する方法が luigi から提供されている。
Dynamic dependencies を使う
Dynamic dependencies(動的依存関係)を使うとrunした際に動的に依存関係を解決できる。
このサンプルコードも併せて確認するとよい
書き換えたコードが以下のものだ
#!/usr/bin/env python |
requiresが全てのタスクから消えて、依存関係を示す処理がArchiveのrunに取り込まれていることがわかる。
このタスクを実施すると以下の流れで実行される。
Archiveのoutputを確認、ファイルがあれば何もせず終了、無ければ次へArchiveのrunを実行Pickupタスクを作成Pickupのoutputを確認、ファイルが無ければrunを実行(yield pickup部分)Pickupのoutputを引数として、FileCopyタスクを作成FileCopyのoutputを確認、ファイルが無ければrunを実行(yield filecopy部分)FileCopyのoutputを引数として、ファイルアーカイブ処理を実施Archiveのoutputを出力して終了
Archiveのrunがちょっとごちゃっとなってしまったけど、目的は達成できた!
ただArchiveからみた依存関係はPickupもFileCopyも同じレベルに変わったことは注意したい。PickupとFileCopyの間に依存関係はないのだ。
だからFileCopyのoutputが存在していても、無関係にPickupは毎回実行される。
まあ、FileCopyは動的に処理したいのだからPickupに依存してなくていい、望んでいた形ではある。
残課題
Archiveは DB 登録する処理だから、outputはMySqlTargetにしたほうがいいPickupは毎回無条件に実施するのであればタスクでなくてよかったかも?以下のコードはその場で処理してしまっていいかもしれない
pickup = Pickup() |
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2