valid,invalid

関心を持てる事柄について

AirflowでDAG実行時にGUI, CLI, REST APIからパラメータを渡す

Airflow webserver GUI▶️ 再生アイコンからTrigger DAGをクリックすると事前にDAG定義の内容で[DAG]が実行される。

ただ、以下のようなユースケースのために、実行するDAG(以下、DAG Run)にパラメータを渡したいことがある。

  • バッチによってはパラメータ付きで手動実行したい
    • JenkinsのParameterized Build的な機能がほしい
  • 一時的なデータ投入・更新バッチのために毎回DAG定義を書くのではなく、汎用的なDAGを用意してパラメータで実行するコマンドを動的に切り替えできると便利

Conf option

DAG RunにJSON形式でパラメーターを渡す Conf というオプションがあり、様々なインタフェースから渡せる。

個人的には「2. GUIからパラメータ付き実行」が欲しかったもの。

1. CLIからパラメータ付き実行

CLIでは--conf (-c) optionからJSONを渡すことができる

-c, --conf JSON string that gets pickled into the DagRun’s conf attribute

Command Line Interface Reference — Airflow Documentation

airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id

For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI? - Stack Overflow

2. GUIからパラメータ付き実行

Trigger DAGでなくAdd DAG RunからもDAGの実行が可能。ヘッダのBrowse -> DAG Runs -> + アイコンクリックで新規DAG Run作成画面に遷移する。

f:id:ohbarye:20201018153053p:plain

この画面にconfフィールドが1.10.8から追加された。ただし、デフォルトのFlask Admin based UIではこの項目は表示されず、Flask AppBuilderベースのRBAC UIでなければ表示されない。

3. REST API

Airflow WebServerはREST API経由でのDAG操作を受け付ける。Experimetalなので今後のversionで更新される可能性は高い。

$ curl 'http://localhost:8080/api/experimental/test'
{"status":"OK"}

API経由でDAGの実行を指示することができ、confをbodyとして渡せばDAGにパラメータを渡せる。

REST API Reference — Airflow Documentation

 $ curl -X POST \
   http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
   -H 'Cache-Control: no-cache' \
   -H 'Content-Type: application/json' \
   -d '{"conf":"{\"key\":\"value\"}"}'

DAGファイルの記述

DAGファイル内において conf パラメータを参照できる箇所は限られている。環境変数のようにどこでも取得できてしまうとworkflow定義が動的に変わってしまうため制限しているようす。

Operatorへ渡す引数

Operatorへ渡す引数のいくつかは template 機能を活用することができるため、以下のように渡すことができる。

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["json_key"] }}" ',
    dag=dag,
)

Jinja template による文字列展開を行っているため多少動的に書くこともできる。

f:id:ohbarye:20201018153512p:plain
template を利用できるオプションは API document に `(templated)` の記述がある
template 内で利用できるマクロ一覧はMacros reference — Airflow Documentationを参照

各バッチを実行する command、または environment にこの template 記述を与えることでパラメータ付きの実行が可能になる。

定期実行の場合は conf の中身のJSONは空になるためDAGファイルまたはバッチ側にて、渡したいパラメータが空のケースをサポートする必要がある。

e.g. '{{ dag_run.conf["json_key"] if dag_run.conf["json_key"] else ""}}'

with DAG(
    ...
    user_defined_macros={
        'my_enum_value': '1', # デフォルト値を指定する場合はmacroとして登録しておく
    },
) as dag

    override = {
        "containerOverrides": [{
            "name": ECS_TASK_ID,
            "command": [
                "bash",
                "-c",
                "bundle exec rake perfect_batch"
            ],
            "environment": [
                {
                    "name": "TARGET_ID",
                    "value": '{{ dag_run.conf["TARGET_ID"] if dag_run.conf["TARGET_ID"] else ""}}'
                },
                {
                    "name": "MY_ENUM_VALUE",
                    "value": '{{ dag_run.conf["MY_ENUM_VALUE"] if dag_run.conf["MY_ENUM_VALUE"] else my_enum_value }}'
                }
            ]
        }]
    }

    t2 = ECSOperator(
        overrides=override,
        ...
    )

Python Operator

Python Operatorで実行される関数の引数から参照できる。

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

Custom Operator

自前で書いた Operator の中でも kwargs を介して参照できそう(未検証)。

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class HelloOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message

環境

  • Airflow 1.10.9

参考