[Python] luigiで動的アウトプット(DB登録)

前回の残課題

  • Archiveは DB 登録する処理だから、outputMySqlTargetにしたほうがいい

どうやってoutputを DB 登録にするか、普段SQLAlchemyを使っているのでSQLAlchemyとの連携をやりたいと思う。

前回のコード

前回のコードは以下のものだ、outputがファイル出力になっているのでこれを DB 登録に対応させていく

import luigi

class Archive(luigi.Task):

def run(self):
pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()

filecopy = FileCopy(input_path)
yield filecopy
output_path = filecopy.output()

# ファイルアーカイブ処理

with self.output().open('w') as f:
f.write('Archive done')

def output(self):
return luigi.LocalTarget('archive-output-file')

参考にしたのは以下

データベースの準備

SQLAlchemy Migrateを使ってMySQLのデータベースを準備する。
テーブル定義はこんな感じ

from sqlalchemy import *
from migrate import *
from sqlalchemy.dialects.mysql import BIGINT, DATETIME, TEXT

meta = MetaData()
table = Table(
'archives', meta,
Column('id', BIGINT(unsigned=True), primary_key=True),
Column('filename', TEXT, nullable=False, unique=True),
Column('filepath', TEXT, nullable=False),
Column('created_at', DATETIME, nullable=False),
Column('updated_at', DATETIME, nullable=False),
mysql_charset='utf8')


def upgrade(migrate_engine):
meta.bind = migrate_engine
table.create()


def downgrade(migrate_engine):
meta.bind = migrate_engine
table.drop()

Dynamic dependencies

ベーシックなサンプルは以下の通りだ(テーブルは既にあるのでreflect = Trueのパターン)

class SQLATask(sqla.CopyToTable):
reflect = True
connection_string = "sqlite://"
table = "item_property"

def rows(self):
for row in [("item1" "property1"), ("item2", "property2")]:
yield row

しかしこのサンプルをそのまま適用することはできない…動的依存関係の解決の為にrun()を定義する必要があるからだ。

なので、ここではデータを登録するタスクデータを準備するタスクを分けるサンプルが参考になる。
データを準備するタスクとしてBuildRecordタスクを定義し、動的依存関係の解決はそのタスクのrun()で行うのだ。

import luigi
from luigi.contrib import sqla
from luigi.mock import MockTarget
from pathlib import Path
from datetime import datetime

class BuildRecord(luigi.Task):

def run(self):
pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()

filecopy = FileCopy(input_path)
yield filecopy
output_path = Path(filecopy.output().path)

with self.output().open('w') as f:
f.write('0\t{0}\t{1}\t{2}\t{3}'.format(
output_path.name, output_path.as_posix(), datetime.now(), datetime.now()))

def output(self):
return MockTarget("BuildRecord")


class Archive(sqla.CopyToTable):

reflect = True
connection_string = 'mysql://<user>:<password>@<server>/<db_name>?charset=utf8'
table = "archives"

def requires(self):
return BuildRecord()

これで DB 登録できるようなった!

基本的な流れは今まで通り、Archiveタスクを実行するとrequiresBuildRecordを呼び、BuildRecordrunでデータを作成、Archiveに戻って SQLAlchemy で DB 登録という形だ。

細かな注意点は以下の通りだ

  • 登録するデータは\tで区切った文字列で渡す。今回は 1 行のみだが、複数行渡す場合は\nで改行する。
  • 区切り文字\tcolumn_separatorオプションで変更可能
  • idAUTO INCREMENTだが、登録データを空白にするとカラムがずれるので0を指定してあげる。そうすると Insert する時にちゃんとAUTO INCREMENTしてくれる。

sqla.CopyToTableoutput

Archiveタスクはoutputとしてレコードを DB に登録することができたが、このレコードを削除したら次にタスクを回した時再度登録されるだろうか?

実際やってみると再登録はされない。それどころかArchive処理自体が走らなくなる。

sqla.CopyToTablecompleteをどうやって判定しているかという話になるが、update_idというメソッドで判定しており、内部でtask_idを参照している。task_idはタスクを走らせた時のログに出てくるArchive__fbe666d9feのような値で、<タスク>_<パラメータ>_<パラメータのmd5ハッシュ>(正確じゃないかもしれないけどこんな感じ)というフォーマットになっている。

つまりタスクとパラメータが同一ならキーが変わらないのでsqla.CopyToTableは走らない。
そしてこの情報はtable_updatesというテーブルが自動的に作成されて保存されている。

今回の要件だとタスクの ID よりもレコードの内容でcompleteを判定して欲しい。具体的にはユニークであるfilenameのカラムだ。
しかしArchiveタスクでupdate_idメソッドをオーバーライドしようとしても、completeを判定するタイミングでfilenameはわからない。ジレンマの再来だ。

残課題

  • update_idをオーバーライドして、レコード内容をもとにcomplete判定をするようにしたい。現状ではパラメータを渡さなければ一回限りの実行しかしないし、何かしらパラメータを渡しても無条件でrunが走る形になってしまう。

余談

サンプルでもMockTarget使ってる…MockTargetはユニットテスト用途と言われているけど、やっぱりoutputがファイルである必要性が無いタスクはMockTarget使ったほうがスマートだよね

実行環境

  • Windows 10
  • Python 3.6.3
  • luigi 2.7.2
  • SQLAlchemy 1.2.1
  • sqlalchemy-migrate 0.11.0