Airflow webserver GUIの ▶️
再生アイコンからTrigger DAG
をクリックすると事前にDAG定義の内容で[DAG]が実行される。
ただ、以下のようなユースケースのために、実行するDAG(以下、DAG Run)にパラメータを渡したいことがある。
- バッチによってはパラメータ付きで手動実行したい
- JenkinsのParameterized Build的な機能がほしい
- 一時的なデータ投入・更新バッチのために毎回DAG定義を書くのではなく、汎用的なDAGを用意してパラメータで実行するコマンドを動的に切り替えできると便利
Conf option
DAG RunにJSON形式でパラメーターを渡す Conf というオプションがあり、様々なインタフェースから渡せる。
個人的には「2. GUIからパラメータ付き実行」が欲しかったもの。
1. CLIからパラメータ付き実行
CLIでは--conf
(-c
) optionからJSONを渡すことができる
-c, --conf JSON string that gets pickled into the DagRun’s conf attribute
Command Line Interface Reference — Airflow Documentation
airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id
2. GUIからパラメータ付き実行
Trigger DAG
でなくAdd DAG Run
からもDAGの実行が可能。ヘッダのBrowse -> DAG Runs -> +
アイコンクリックで新規DAG Run作成画面に遷移する。
この画面にconf
フィールドが1.10.8から追加された。ただし、デフォルトのFlask Admin based UIではこの項目は表示されず、Flask AppBuilderベースのRBAC UIでなければ表示されない。
- Changelog: Changelog — Airflow Documentation
- Ticket: [AIRFLOW-5843] Add conf form when trigger DAG from the WEB. - ASF JIRA
- Pull request: [AIRFLOW-5843] Add conf option to Add DAG Run view by JCoder01 · Pull Request #7281 · apache/airflow · GitHub
- Related Question: Airflow: Trigger DAG via UI with Parameters/Config - Stack Overflow
3. REST API
Airflow WebServerはREST API経由でのDAG操作を受け付ける。Experimetalなので今後のversionで更新される可能性は高い。
$ curl 'http://localhost:8080/api/experimental/test' {"status":"OK"}
API経由でDAGの実行を指示することができ、conf
をbodyとして渡せばDAGにパラメータを渡せる。
REST API Reference — Airflow Documentation
$ curl -X POST \ http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \ -H 'Cache-Control: no-cache' \ -H 'Content-Type: application/json' \ -d '{"conf":"{\"key\":\"value\"}"}'
DAGファイルの記述
DAGファイル内において conf パラメータを参照できる箇所は限られている。環境変数のようにどこでも取得できてしまうとworkflow定義が動的に変わってしまうため制限しているようす。
Operatorへ渡す引数
Operatorへ渡す引数のいくつかは template 機能を活用することができるため、以下のように渡すことができる。
bash_task = BashOperator( task_id="bash_task", bash_command='echo "Here is the message: ' '{{ dag_run.conf["json_key"] }}" ', dag=dag, )
Jinja template による文字列展開を行っているため多少動的に書くこともできる。
各バッチを実行する command
、または environment
にこの template 記述を与えることでパラメータ付きの実行が可能になる。
定期実行の場合は conf の中身のJSONは空になるためDAGファイルまたはバッチ側にて、渡したいパラメータが空のケースをサポートする必要がある。
e.g. '{{ dag_run.conf["json_key"] if dag_run.conf["json_key"] else ""}}'
with DAG( ... user_defined_macros={ 'my_enum_value': '1', # デフォルト値を指定する場合はmacroとして登録しておく }, ) as dag override = { "containerOverrides": [{ "name": ECS_TASK_ID, "command": [ "bash", "-c", "bundle exec rake perfect_batch" ], "environment": [ { "name": "TARGET_ID", "value": '{{ dag_run.conf["TARGET_ID"] if dag_run.conf["TARGET_ID"] else ""}}' }, { "name": "MY_ENUM_VALUE", "value": '{{ dag_run.conf["MY_ENUM_VALUE"] if dag_run.conf["MY_ENUM_VALUE"] else my_enum_value }}' } ] }] } t2 = ECSOperator( overrides=override, ... )
Python Operator
Python Operatorで実行される関数の引数から参照できる。
def run_this_func(ds, **kwargs): print("Remotely received value of {} for key=message". format(kwargs['dag_run'].conf['message'])) run_this = PythonOperator( task_id='run_this', provide_context=True, python_callable=run_this_func, dag=dag, )
Custom Operator
自前で書いた Operator の中でも kwargs
を介して参照できそう(未検証)。
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message
環境
- Airflow 1.10.9