[Python] luigiで動的アウトプット

最近luigiというワークフローフレームワークで Python のバッチ処理を書いている。

この luigi の特徴として複数のタスクで依存関係を持たせることができ、以下のような流れで依存関係を解決する。

  • タスクは完了するとoutputとしてファイルを出力する。
  • タスクは起動の条件となる前提タスク(requires)がある場合、前提タスクが先に走る。
  • 前提タスクのoutputが既に存在する場合は、前提タスクが走らずにタスクが起動する。

つまりoutputのファイルでタスクの動きを制御しているのだ。

今回やりたかったこと

今回やりたかったことは以下 3 点

  • ターゲットフォルダからファイルを 1 個自動で選ぶ
  • バックアップフォルダへコピーする
  • コピー先のパスを DB に登録する

これだけ、内容自体はよくある処理で、個別に実装する分にはイージーだ。

それぞれをPickup,FileCopy,Archiveという名前でタスクを作成する。

  • ターゲットフォルダからファイルを 1 個自動で選ぶ -> Pickup
  • バックアップフォルダへコピーする -> FileCopy
  • コピー先のパスを DB に登録する -> Archive

処理の流れはPickup->FileCopy->Archiveなんだけど、依存関係を遡って処理していくので最初に動かすタスクはArchiveになる。

こんな感じで流れる

  1. Archiveoutputを確認、ファイルがあれば何もせず終了、無ければ次へ
  2. Archiverequiresを確認、あれば依存タスク(FileCopy)へ
  3. FileCopyoutputを確認、ファイルがあればArchiveだけrunして終了、無ければ次へ
  4. FileCopyrequiresを確認、あれば依存タスク(Pickup)へ
  5. Pickupoutputを確認、ファイルがあればFileCopy->Archiveの順でrunして終了、無ければ次へ
  6. Pickuprequiresを確認、依存タスクが無ければPickup->FileCopy->Archiveの順でrunして終了

コードはこんな感じになるだろう

#!/usr/bin/env python
# coding: utf-8

import luigi
import shutil


class Pickup(luigi.Task):

def run(self):
# ファイルピックアップ処理
with self.output().open('w') as f:
f.write('<ピックアップ結果のファイルパス>')

def output(self):
return luigi.LocalTarget('pickup-output-file')


class FileCopy(luigi.Task):

def requires(self):
return Pickup()

def run(self):
with self.input().open('r') as f:
input_path = f.read()
shutil.copy2(input_path, '<アウトプットパス>')

def output(self):
return luigi.LocalTarget('<アウトプットパス>')


class Archive(luigi.Task):

def requires(self):
return FileCopy()

def run(self):
# ファイルアーカイブ処理
with self.output().open('w') as f:
f.write('Archive done')

def output(self):
return luigi.LocalTarget('archive-output-file')

output をどうするか

上記のコードでoutputで出力するファイルをどうするか考える

ArchivePickupのようにファイル名が決まっている場合(requiresで呼ばれたタイミングでは決まっている場合)は問題ないが、FileCopyのように動的にファイルを決めたい場合(requires時点ではまだファイル名が決定できない場合)は途端に難しくなる。

上記のコードではArchiveから依存タスクを遡っていって、実際はPickupが最初にrunされる。この時に処理するファイルが自動でピックアップされる(っていう仕様)ので、ここで初めてコピー・DB 登録されるファイルが決まるのである。
しかし luigi では依存タスクの解決をする時点でファイル名は定まっていなければならない。ArchiveFileCopyに依存しているとチェックする一番最初の箇所で、FileCopyoutputは何というファイルなのかを求められてしまうのだ。

処理されるファイル名が動的に決定するというパターンは決してレアケースではなく、むしろありふれた処理だと思う。しかし luigi ではタスクの依存性を解決していく中でファイル名を動的に取得するという動きができない。

依存性を解決する為にファイル名が必要、ファイル名を決める為には依存性の解決が必要、ルイージのジレンマだ。

ではどうするの?

そもそもrequiresoutputで連鎖的に依存関係を解決していく方法では、outputの出力ファイルはrequires時点で決定していなければならない」という制約があることを理解することだ。
今回のように動的にoutputを定めたい場合はrequiresを用いらずに動的に依存関係を解決する方法が luigi から提供されている。

Dynamic dependencies を使う

Dynamic dependencies(動的依存関係)を使うとrunした際に動的に依存関係を解決できる。

このサンプルコードも併せて確認するとよい

書き換えたコードが以下のものだ

#!/usr/bin/env python
# coding: utf-8

import luigi
import shutil


class Pickup(luigi.Task):

def run(self):
# ファイルピックアップ処理
with self.output().open('w') as f:
f.write('<ピックアップ結果のファイルパス>')

def output(self):
return luigi.LocalTarget('pickup-output-file')


class FileCopy(luigi.Task):
input_path = luigi.Parameter()

def run(self):
shutil.copy2(self.input_path, '<アウトプットパス>')

def output(self):
return luigi.LocalTarget('<アウトプットパス>')


class Archive(luigi.Task):

def run(self):

pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()

filecopy = FileCopy(input_path)
yield filecopy
output_path = filecopy.output()

# ファイルアーカイブ処理

with self.output().open('w') as f:
f.write('Archive done')

def output(self):
return luigi.LocalTarget('archive-output-file')

requiresが全てのタスクから消えて、依存関係を示す処理がArchiverunに取り込まれていることがわかる。

このタスクを実施すると以下の流れで実行される。

  1. Archiveoutputを確認、ファイルがあれば何もせず終了、無ければ次へ
  2. Archiverunを実行
  3. Pickupタスクを作成
  4. Pickupoutputを確認、ファイルが無ければrunを実行(yield pickup部分)
  5. Pickupoutputを引数として、FileCopyタスクを作成
  6. FileCopyoutputを確認、ファイルが無ければrunを実行(yield filecopy部分)
  7. FileCopyoutputを引数として、ファイルアーカイブ処理を実施
  8. Archiveoutputを出力して終了

Archiverunがちょっとごちゃっとなってしまったけど、目的は達成できた!

ただArchiveからみた依存関係はPickupFileCopyも同じレベルに変わったことは注意したい。PickupFileCopyの間に依存関係はないのだ。
だからFileCopyoutputが存在していても、無関係にPickupは毎回実行される。
まあ、FileCopyは動的に処理したいのだからPickupに依存してなくていい、望んでいた形ではある。

残課題

  • Archiveは DB 登録する処理だから、outputMySqlTargetにしたほうがいい
  • Pickupは毎回無条件に実施するのであればタスクでなくてよかったかも?以下のコードはその場で処理してしまっていいかもしれない
pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()

実行環境

  • Windows 10
  • Python 3.6.3
  • luigi 2.7.2