valid,invalid

関心を持てる事柄について

2020年の健康と体重と運動

ohbarye Advent Calendar 2020の3日目の記事です。


体重推移

  • 最大値: 69.0kg
  • 最小値: 64.0kg

f:id:ohbarye:20201229154559p:plain
"波形-バイオリズム-"

1~2月

週5で出社し、ランチはほぼ毎食外食していたので米をそれなりに摂取していた。 夕食はたいていサラダ。

通勤で歩いていたのと、オフィスでたまに懸垂する程度の運動。

結果は微増だが継続していたら2017年ぶりに70.0kgまで昇っていたかもしれない。

3~8月

ほぼ毎日自宅におり、自発的な運動ゼロ。

半年で体重が5.0kg落ちたが筋肉量が減っただけのような気がする。

外食をしなくなったので米の摂取量がかなり減少した。

9月~12月

引き続きほぼ毎日自宅にいるが、運動習慣を復活させた。

週2~4のペースで15分のランニングと合わせて以下を行っている。

半年ぶりの懸垂でだいぶ衰えを感じたが背中の使い方を思い出せた。

また、週4~7のペースで腹筋ローラーを続けている。膝をついた状態で上体を伸ばし切るところまでできるようになった。腹筋は成長が早い。

いずれも回数はあまり気にせず疲労を感じる程度まで。追い込むほどではないので筋肉痛になったりならなかったりする。

10月から蒸した温野菜 or 鍋を主食にしており、週4~6日は野菜350g/日取れていると思う。

野菜中心なので筋肥大のためのカロリーが足りてない気がするが体重が増えているので継続する。

総括

今年も炊飯器抜きの生活ができた。しかし、糖質を抑えていても運動が不足していると、体重減少は達成できても"健康"からは遠ざかっている感じがしたので運動習慣を再開できてよかった。

また、今年のMVPは家で眠らせていた蒸し器(フードスチーマー)で、これを発掘したおかげで野菜生活を取り戻せた。

任意の野菜を突っ込むだけで温野菜ができる。温野菜の味はオリーブオイルと塩だけでも十分だが、コロナ禍でブームとなったレトルトカレーやスパゲティソースを温野菜に絡めることで毎日異なる味を楽しめる。タンパク質はサラダチキンやゆで卵やローストビーフや納豆等を中心に摂取する。

そういえば外出も少なくコロナ対策も一通りやっていたおかげか、一度も風邪を引かなかったのも良い話。


振り返ってみて概ね良い感じに進捗しているかと思ったが毎年恒例の健康診断を受診し忘れていたことに気づいた…。

TWINBIRD フードスチーマー ホワイト SP-4137W

TWINBIRD フードスチーマー ホワイト SP-4137W

  • 発売日: 2010/03/25
  • メディア: ホーム&キッチン

複数のDocker Compose YAMLをマージして1つにする

複数のdocker-compose.ymlをマージして1つのYAMLにする方法です。


複数のアプリケーションが協調して動くようなシステム*1を開発していて、各レポジトリにdocker-compose.ymlが存在している状況を想定します。

ローカルで複数アプリを協調して稼働させるにはアプリケーションAのrepositoryに移動してdocker-compose upし、次はBに移動して…というのをやっていたのですが、どうせdocker-composeするのならレポジトリ構成に依存せず必要なアプリケーション全てに対してまとめて操作したいのでYAMLを統合してまとめて管理できるようにしました。

操作とはup, down, buildを始めとした諸々です。

docker-composeの関連知識

以下の知識を利用します。

  • docker-composeには環境変数COMPOSE_FILEで複数のファイルを渡すことができ、YAMLの内容はマージされる
    • docker-compose -f <filename>でもできる
  • docker-compose configコマンドでcompose fileを検証、評価できる
    • マージ結果を確認できる
COMPOSE_FILE=./auth_api.yml:./payment_api.yml:./payment_frontend.yml \
  docker-compose config > ./docker-compose.yml

複数ファイル指定時のマージは一番最初に指定したファイルに対して後続のファイルの設定をマージしていく挙動になります。

  • キーが無い場合:追加
  • キーがある場合:後で指定したファイルの内容で上書き
  • リストの場合:リストの要素を追加

主に、基底となるファイルを環境ごとに拡張して使いまわしたいときに使える (docker-compose.local.ymlでローカル用の設定を足すなど)

具体的なやり方

以下のようにrepository群が並んでいるとします。

# ディレクトリ構成
.
└── repos
    ├── auth_api
    │   └── docker-compose.yml
    ├── payment_api
    │   └── docker-compose.yml
    └── payment_frontend
        └── docker-compose.yml

これらを愚直にCOMPOSE_FILE=./repos/auth_api/docker-compose.yml:...と繋げていってもconfigコマンドは成功した風に完了します。

しかし、YAMLファイル内に記述した相対パスが各repositoryのルート基準ではなくCOMPOSE_FILE envに渡した最初のファイルのパス基準で評価されてしまいます。

具体例として、repos/payment_api/docker-compose.yml内に以下のような相対パスを書いていたとします。

services:
  payment_api:
    build: .

ここでCOMPOSE_FILE=./repos/auth_api/docker-compose.yml:./repos/payment_api/docker-compose.ymlというコマンドを実行すると、この相対パスは意図しないディレクトリとして評価されてしまいます。

services:
  payment_api:
    build:
      context: /absolute/path/to/repos/auth_api

これを避けるためには最終形態の特大YAMLを生成する前に各repositoryのYAMLを個別に評価し、YAML内の相対パス絶対パス表現に変換する必要があります。パスを予め静的に決定することはできないので動的に生成します。

そのためにこんなRubyスクリプトを書きました。

#!/usr/bin/env ruby

require 'yaml'
require 'pathname'
require 'fileutils'

root_dir = Pathname.new(__dir__)
repos = %w[
  auth_api
  payment_api
  payment_frontend
].map {|r| root_dir.join('repos', r) }

FileUtils.mkdir_p "tmp/config"

tmp_config_filenames = repos.each_with_object([]) do |repo, memo|
  compose_path = repo.join('docker-compose.yml')
  if compose_path.exist?
    memo << root_dir.join("tmp/config/#{repo.basename}.yml").tap do |tmp_config_filename|
      system "COMPOSE_FILE=#{compose_path} docker-compose config > #{tmp_config_filename}"
    end
  end
end

system "COMPOSE_FILE=#{tmp_config_filenames.join(':')} docker-compose config > ./docker-compose.yml"

見た通り、各repositoryのYAMLdocker-compose configで評価したものを一時ディレクトリに吐き出します。ここで生成されたYAMLたちの中ではパスはすべて絶対パスに変換されています。

この一時ディレクトリのファイルをCOMPOSE_FILEに与え、再度docker-compose configを実行するとパス問題が解決された大統一YAMLが生成されます。

場合によってはnetworksの調整やservice nameやport衝突やらが発生したりすると思いますが、各repositoryのYAMLを修正したり大統一YAMLをちょっと修正するようにスクリプトを直せばなんとかなります。なる。

これで勝利です。レポジトリが別れていてもすべてのserviceをまとめてdocker-composeで操作することができます。

補遺

docker-compose実行時のファイルを指定したエイリアスを登録しておくと各レポジトリ内で操作するときも大統一YAMLを参照できて便利です。

alias xc='docker-compose -f /Users/ohbarye/path/to/unified/docker-compose.yml'

また、対象のレポジトリ群を管理するレポジトリを作っておくと上述のようなスクリプトを置いたり、全レポジトリをまたぐ横断的な"なにか"をするときに非常に便利です*2

最近の事例だと、共通のGitHub Actionsをまとめて突っ込んだり、master branchをまとめてmain branchに変更するときに役立ちました。


ohbarye Advent Calendar 2020の2日目の記事でした。

*1:マイクロサービスかどうかは問わない

*2:この発想は、@mtsmfmが中心になって作ってくれたQallという仕組みにインスパイアされています。現在はQuipper社はmonorepoなので事情がupdateされています。

sentry-ravenでエラー通知するとrack envの中身が書き換わることがある

エラー検知・監視ツールであるところのSentryが提供するRubySDKsentry-ravenというgemがあります。

このgemを利用するとごくわずかなコードの記述をするだけでSentryに対してイベントを送信することができます。イベントにはユーザーが定義したカスタムタグや実行環境を含む多様な情報を含めることができ大変便利です。

begin
  some_dangerous_code!
rescue => ex
  Raven.capture_exception(ex)
end

# とか

if record.unknown?
  Raven.capture_message("Found suspisious data")
end

上述のようなコードを見た/書いた方も多いと思います。しかしながら特定条件下では上記のような Raven.capture_*メソッドの呼び出しの内部でRack envの一部を書き換えてしまう ことがあります。

なお、本件が意図した挙動なのか不明だったので本体にレポートしまして、トリアージ待ちです。本件の行く末が気になる方はissueをウォッチしていただければと思います。

github.com

また、sentry-ravenはすでにアクティブな開発が停止しており、sentry-rubysentry-railsに移行するようアナウンスがされています。僕が遭遇したプロジェクトでもこの問題による不具合を契機としてmigrationを実施しています。

Migration Guide for Ruby | Sentry Documentation

環境

  • rack 2.1.1
  • sentry-raven 3.1.1

事象の説明

以下の条件をすべて満たすときRack envのenv["rack.request.form_hash"]のHashのうち、2で該当したkeyのvalueがマスク用の文字列 ("********") に破壊的に書き換えられます。

  1. Rack::Request#form_data?trueとなる
    • Content-Type"application/x-www-form-urlencoded""multipart/form-data"であるとき
    • 画像アップロードでハマった
  2. リクエストのform dataのkeyが以下のいずれかに一致・マッチする
Raven.configure do |config|
  config.sanitize_fields = %w[quite_sensitive_field]
end

後継のsentry-rubyでは起きない?

はい。

sentry-ravenが持つdata sanitization等を行う機構をprocessorと呼ぶのですが、この機構は後継のsentry-rubyや周辺のgemには存在しません。ユーザーが自前でhookを書いたり、SentryのGUIからfilter ruleを登録するようになっています。

何が問題か

Raven.capture_*に副作用がないと思っていると痛い目を見る…。

具体的な例を挙げます。

RailsActionParameterインスタンスを作るより手前のRack middlewareでRaven.capture_*を呼び出した場合、paramsの中身の一部が"********"になり、クライアントから送信されたform dataを参照できません。

もしくはRailsのcontrollerやmodelでRaven.capture_*を呼び出し、そのあとでenv["rack.request.form_hash"]を見るようなことがあれば同様にマスクされた文字列になっています。

ActionParameterインスタンスが保持するform dataはコピーされたものなのでRailsアプリケーション内でparamsを見るふつうの(?)コードを書いていれば途中でRavenを使っても問題ないでしょう。

発見の経緯

そんなことするか?って思いますかね。まぁたしかにenvを直接触ることはほぼ無いと思いますが、rack middlewareのどこかでうっかりSentryに通知することはあるんじゃないでしょうか。というか、あった。

僕がハマったパターンは「committeeCommittee::Middleware::RequestValidation middlewareでOpenAPI定義に違反するリクエストが来たらSentryに記録する(けど後続の処理はそのまま行う)」という使い方でした。

最初はユーザーが不正なリクエストをして400になっただけと思ったけど、開発環境でも事象が再現すること、および僕が変更したOpenAPIの定義をrevertすると再現しないことを同僚が発見してくれたのでバグだと気づくことができました。

sentry-ravenの実装の詳細

Rack::Request#form_data?trueのときRaven::RackRack::Request#POSTを呼び出し、書き換える対象のHashへの参照を得ています。

 # https://github.com/getsentry/sentry-ruby/blob/82e1ffe711af287ddc23e8517bdb8275beff94d5/sentry-raven/lib/raven/integrations/rack.rb#L87-L89
 def read_data_from(request) 
   if request.form_data? 
     request.POST 

後のRaven::Event#to_hashというメソッドで実行環境の情報をHash化します。このときも同一のポインタをそのまま渡しています。

# https://github.com/getsentry/sentry-ruby/blob/82e1ffe711af287ddc23e8517bdb8275beff94d5/sentry-raven/lib/raven/client.rb#L74-L75
def encode(event) 
   hash = @processors.reduce(event.to_hash) { |a, e| e.process(a) } 

そしてRaven::Processor::SanitizeDataというデータマスクを行うclassにそのHashが渡るのですが、その中でHash#merge!を実行して破壊的に更新をしています。

# https://github.com/getsentry/sentry-ruby/blob/82e1ffe711af287ddc23e8517bdb8275beff94d5/sentry-raven/lib/raven/processor/sanitizedata.rb#L44-L50
 def sanitize_hash_value(key, value) 
   if key =~ sensitive_fields 
     STRING_MASK 
   elsif value.frozen? 
     value.merge(value) { |k, v| process v, k } 
   else 
     value.merge!(value) { |k, v| process v, k } 

デバグ

さらっと書きましたが最初は「OpenAPI定義を変更したらリクエストパラメータがマスクされるようになった」ように見えたのでわけがわかりませんでした。

Raven configのsanitize_fieldを変更すると再現しない、Sentryへの通知をやめると再現しない、といったあたりまで同僚が絞り込んでくれたので、僕の方ではsentry-ravenのコードを読みながらステップ実行していってようやく原因にあたりがつきました。

最小の再現プロジェクトを作る

100%再現するバグなら勝利は約束されるので、最小の構成で再現するプロジェクトを作ってみました。

github.com

Raven::Rackをかませたrack appを作り、その中でRaven.capture_*を呼ぶ。その前後でrack envの中身が変わっていることを確認する。 味の秘密は、それだけ。

require 'rack'
require 'sentry-raven'

Raven.configure do |config|
  config.dsn = ENV['SENTRY_DSN']
  config.sanitize_fields = %w[name]
end

class SentryRavenSanitizeDemo
  def call(env)
    r = Rack::Request.new(env)

    puts "=========Before========="
    puts r.POST
    puts env[Rack::RACK_REQUEST_FORM_HASH]
    before = env[Rack::RACK_REQUEST_FORM_HASH].dup

    begin
      raise "foo"
    rescue => ex
      Raven.capture_exception(ex)
    end

    puts "=========After========="
    puts r.POST
    puts env[Rack::RACK_REQUEST_FORM_HASH]
    after = env[Rack::RACK_REQUEST_FORM_HASH].dup

    [200, {"Content-Type" => "text/plain"}, ["before: #{before}, after: #{after}"]]
  end
end

use Raven::Rack
run SentryRavenSanitizeDemo.new

rackupで起動し、curlでform dataをPOSTしてみると…

$ curl -X POST http://localhost:9292 -F 'name=foo' -F 'age=32'

before: {"name"=>"foo", "age"=>"32"}, after: {"name"=>"********", "age"=>"32"}%

BINGOです。sanitize_fieldをいじったりRaven.capture_exception(ex)を呼ばなければ再現しなくなることもすぐに確認できました。

結論

  • 特定条件下では上記のような Raven.capture_*メソッドの呼び出しの内部でRack envの一部を書き換えてしまう ことがある
  • sentry-ravenはすでにアクティブな開発が停止しているのでsentry-rubysentry-railsに移行しよう

ohbarye Advent Calendar 2020の1日目は@ohbaryeが担当しました。

GitHub Actionsで特定のbranchでのworkflow失敗のみ通知する

GitHub ActionsでCI/CDを組んでいるとき、特定のbranchのみ失敗を通知させたいことがある。たとえば「develop / main branchだけworkflow上でのテストの失敗を通知したい」というようなケース。

結論

jobs.<job_id>.ifでその通りに条件を指定すればよい。

failure()失敗時、かつcontains('refs/heads/develop refs/heads/main', github.ref)

rtCamp/action-slack-notifyを使っているが何でもよい。Slack関連のactions、100個ぐらい乱立している)

# .github/workflows/failure_notification_example.yml
name: Failure notification
on:
  push:
jobs:
  job:
    runs-on: ubuntu-18.04
    steps:
      - name: Run command
        run: |
          foobar # わざと失敗させている
      - name: Notify
        uses: rtCamp/action-slack-notify@master
        if: ${{ failure() && contains('refs/heads/develop refs/heads/main', github.ref) }}
        env:
          SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
          SLACK_COLOR: '#A30100'
          SLACK_TITLE: '"事故"る奴は…"不運-ハードラック-"と”踊-ダンス-"っちまったんだよ…'
          SLACK_MESSAGE: 'See ${{ github.event.repository.url }}/actions/runs/${{ github.run_id }}'
          SLACK_FOOTER: ''
          MSG_MINIMAL: true

f:id:ohbarye:20201103001250p:plain
実行例

これで特定branchでのworkflow失敗に気付くことができる。

説明

GitHub Actionsのstep単位での実行をconditionalに制御するにはjobs.<job_id>.ifを使う。

まず、Job status check functionsで成否に基づく分岐が可能。

jobs:
  job:
    steps:
      - name: Anything
         run: what you want
      - name: conditional step
        if: failure() # 失敗時のみ
#       if: success()   成功時のみ
#       if: always()    常に

また、${{ <expression> }}のexpressionで参照可能なcontextを使い、任意の条件で絞り込める。

branchによる絞り込みであれば

github.ref == 'refs/heads/develop' || github.ref == 'refs/heads/main'

でもいいし

contains('
  refs/heads/develop
  refs/heads/main
', github.ref)

でもいい。

その他、syntax, expressionに関するドキュメントを見ればifの条件だけでなくmessageに埋め込める便利情報がたくさんある。

Airflow webserverのUIがなぜ2種類存在するのか

AirflowでDAG実行時にGUI, CLI, REST APIからパラメータを渡す - valid,invalid件の調査をしていて知ったのだが、Airflow webserverのUIには2種類のUIが存在する。

利用できる機能に差分があるうえに、Airflow本体のリリースフローも初見ではわからず、ややこしかったので整理する。

2種類のUI

1. flask-admin based web UI

1系ではデフォルトで使用されるUI。名前の通りflask-adminベースで作られている。

2. FAB based web UI

Roll Based Access Controlを提供しているのでドキュメントではRBAC UI と表記されることもある。 突然の略称のため名前からはわかりづらいがFlask AppBuilderベースで作られている。

1.10.0以降で利用可能 (changelog) であり、airflow.cfg[webserver] rbac = Trueを設定するとこちらになる。

なお、flask-admin based web UIはLegacy UIと表され、2系ではFAB based web UIのみの提供となる予定とのこと。


一部の機能は現時点で最新である1.10.12でも FAB-based web UI にしか存在しない。 同じバージョンでもviewに差分がある例↓(GUI から DAG Run に conf を渡すフィールドがFAB-basedの方にしか存在しない)

歴史的経緯

Improving Airflow UI Securityで明晰に語られている。

2015年のAirflowはセキュリティに関する機能が少なかった

誰もがメタデータDBを見られる、グローバルに共有されているオブジェクトを編集できる、DAG を開始または停止したり失敗した TaskInstance を成功マークにしたり、その逆をしたりすることができる…といった具合。

Nginxなど別の機構でGUIへのアクセスを制御することはできたがかなり不便だったのでRBAC UIに取り組み始めた。

Airflow webserverはFlask Adminで構築されており、RBAC UIを実装するために4つの可能性のあるアプローチが検討されていた

  1. Django への移行
    • ビルトインのユーザー認証システムと、より成熟した拡張機能のエコシステムを備えている
    • 移行作業の量が利益に比べて限定的となる
  2. クラッチ
    • Airflow に合わせてカスタマイズされた RBAC システムを構築することができ、この機能を時間をかけて進化させる柔軟性が得られる
    • 利用できる資産を利用しないので多大なコスト
    • 想定しているRBACセキュリティモデルはかなり汎用的なものであるため、ゼロからの実装は魅力的でなかった
  3. Flask extension
    • Flask-RBACやFlask-Principalなど、RBACを扱うために特別に書かれたflask拡張機能
    • 拡張機能はいずれも事前に定義されたロールを用いる必要があるが、Airflowではconfigurableであってほしい
  4. Flask-Appbuilderに切り替える
    • Flask-AppBuilder (FAB) は Flask-Admin に似た小さいフレームワーク
    • ビルトインの RBAC システムがconfigurable
      • セッション管理、様々な認証バックエンドとの統合ができる
    • Apache-Superset] で使用されていて実績がある

最終的にFABが採用された。メンテナは機能を統合して Airflow をshipすることに集中し、FABがほとんどを処理してくれることを期待した。

実際の移行作業

どちらもDjangoインスパイアのフレームワークであり、慣習も踏襲されていたので良かった。ORMのSQLAlchemyサポートを両者ともしてたのでモデル層は全く手を入れずに済んだ。

たくさん手を入れたのはViewであり、いくつかの機能が足りなかったぶんはFABにパッチを送った。

レポジトリ戦略

もともと別レポジトリで概念実証のために開発していた。

https://github.com/wepay/airflow-webserver 今は404

おかげで検証や開発は高速で進んだが、Airflowの早い進化に追従するには本体にマージしなければいけない。代替UIのための別レポジトリを維持するコストもかかるし、Airflow本体と誤認されたり、間違ってインストールされたりしたくないため。

互換性

Airflowは下位互換性を重視するため、ユーザーが新しいUIに移行するための十分な時間を確保したかった。レガシーUIを完全に廃する前に、短期間は両方のUIを並行して利用できるようにすることを決めた。

  • 既存の UI 関連のコードはすべて www/ ディレクトリに置く
  • コードベースがif文だらけにならないように、UI関連の変更はすべてwww_rbac/という新しいディレクトリで行い、RBAC専用の別のFlaskアプリを作成した

リリース

RBAC UIは2018-08にAirflow 1.10.0でリリースされた

Changelog — Airflow Documentation

airflow/UPDATING.md at master · apache/airflow · GitHub

今後

Airflow 2.xからはレガシーUIは提供しなくなるため、1系を使っているユーザーにもrbac = Trueで試すことを推奨している。

airflow/UPDATING.md at master · apache/airflow · GitHub

Airflowをベースにしている[Google Clound Composer]ではRBAC UIをサポートしていないため、一部機能は2系になるまで使えないはず。Clound IAMでおおむね権限を制御できるかもしれないがDAGレベルの制御は難しそうかも(触ったことないので不明)。


masterairflow/www/views.pyにmergeされているし、shipされたはずの機能がなぜ手元で表示されないのか…?と悩んでいたら、flask-admin based web UIだった、というハマりがあったので謎が解けてよかった。ちょうど移行の過渡期にAirflowに触り始めてしまっただけだった。

大きなOSSの内部大改装は作業だけでなくストラテジを練ったり広報したりも含めてものすごい大変というのが垣間見えた。Airflowメンテナが、www以下への変更をリリース前にwww_rbacへcherry-pickしていてかなり大変そうだったので早く2系が公開されて負担が減っていくとよいと思う。

AirflowでDAG実行時にGUI, CLI, REST APIからパラメータを渡す

Airflow webserver GUI▶️ 再生アイコンからTrigger DAGをクリックすると事前にDAG定義の内容で[DAG]が実行される。

ただ、以下のようなユースケースのために、実行するDAG(以下、DAG Run)にパラメータを渡したいことがある。

  • バッチによってはパラメータ付きで手動実行したい
    • JenkinsのParameterized Build的な機能がほしい
  • 一時的なデータ投入・更新バッチのために毎回DAG定義を書くのではなく、汎用的なDAGを用意してパラメータで実行するコマンドを動的に切り替えできると便利

Conf option

DAG RunにJSON形式でパラメーターを渡す Conf というオプションがあり、様々なインタフェースから渡せる。

個人的には「2. GUIからパラメータ付き実行」が欲しかったもの。

1. CLIからパラメータ付き実行

CLIでは--conf (-c) optionからJSONを渡すことができる

-c, --conf JSON string that gets pickled into the DagRun’s conf attribute

Command Line Interface Reference — Airflow Documentation

airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id

For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI? - Stack Overflow

2. GUIからパラメータ付き実行

Trigger DAGでなくAdd DAG RunからもDAGの実行が可能。ヘッダのBrowse -> DAG Runs -> + アイコンクリックで新規DAG Run作成画面に遷移する。

f:id:ohbarye:20201018153053p:plain

この画面にconfフィールドが1.10.8から追加された。ただし、デフォルトのFlask Admin based UIではこの項目は表示されず、Flask AppBuilderベースのRBAC UIでなければ表示されない。

3. REST API

Airflow WebServerはREST API経由でのDAG操作を受け付ける。Experimetalなので今後のversionで更新される可能性は高い。

$ curl 'http://localhost:8080/api/experimental/test'
{"status":"OK"}

API経由でDAGの実行を指示することができ、confをbodyとして渡せばDAGにパラメータを渡せる。

REST API Reference — Airflow Documentation

 $ curl -X POST \
   http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
   -H 'Cache-Control: no-cache' \
   -H 'Content-Type: application/json' \
   -d '{"conf":"{\"key\":\"value\"}"}'

DAGファイルの記述

DAGファイル内において conf パラメータを参照できる箇所は限られている。環境変数のようにどこでも取得できてしまうとworkflow定義が動的に変わってしまうため制限しているようす。

Operatorへ渡す引数

Operatorへ渡す引数のいくつかは template 機能を活用することができるため、以下のように渡すことができる。

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["json_key"] }}" ',
    dag=dag,
)

Jinja template による文字列展開を行っているため多少動的に書くこともできる。

f:id:ohbarye:20201018153512p:plain
template を利用できるオプションは API document に `(templated)` の記述がある
template 内で利用できるマクロ一覧はMacros reference — Airflow Documentationを参照

各バッチを実行する command、または environment にこの template 記述を与えることでパラメータ付きの実行が可能になる。

定期実行の場合は conf の中身のJSONは空になるためDAGファイルまたはバッチ側にて、渡したいパラメータが空のケースをサポートする必要がある。

e.g. '{{ dag_run.conf["json_key"] if dag_run.conf["json_key"] else ""}}'

with DAG(
    ...
    user_defined_macros={
        'my_enum_value': '1', # デフォルト値を指定する場合はmacroとして登録しておく
    },
) as dag

    override = {
        "containerOverrides": [{
            "name": ECS_TASK_ID,
            "command": [
                "bash",
                "-c",
                "bundle exec rake perfect_batch"
            ],
            "environment": [
                {
                    "name": "TARGET_ID",
                    "value": '{{ dag_run.conf["TARGET_ID"] if dag_run.conf["TARGET_ID"] else ""}}'
                },
                {
                    "name": "MY_ENUM_VALUE",
                    "value": '{{ dag_run.conf["MY_ENUM_VALUE"] if dag_run.conf["MY_ENUM_VALUE"] else my_enum_value }}'
                }
            ]
        }]
    }

    t2 = ECSOperator(
        overrides=override,
        ...
    )

Python Operator

Python Operatorで実行される関数の引数から参照できる。

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

Custom Operator

自前で書いた Operator の中でも kwargs を介して参照できそう(未検証)。

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class HelloOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message

環境

  • Airflow 1.10.9

参考

Airflow 1.10.6ではweb serverのREST APIがデフォルトで全公開されている

問題

Airflow webserverはREST APIを公開しており、DAGの参照や作成などが可能となっている。

# 疎通確認
$ curl 'https://your.domain/api/experimental/test'
{"status":"OK"}

1.10.6のデフォルトの設定では破壊的操作を含むAPIを認証なしで全公開するinsecureな作りになっていた。

経過

本体でも危険性に気づき、[AIRFLOW-6027] Disable API access by default by mik-laj · Pull Request #6625 · apache/airflow · GitHubdefaultの挙動が修正された。

また、1.10.11でshipされたChange default auth for experimental backend to deny_all by ashb · Pull Request #9611 · apache/airflow · GitHubにより、airflow.cfgの設定がデフォルトでdeny_allとなるように修正された。

推奨される設定

API機能を利用しないAirflow webserverであればairflow.cfgで当機能をオフにしておくのが望ましい。

 [api]
 # How to authenticate users of the API
 -auth_backend = airflow.api.auth.backend.default
 +auth_backend = airflow.api.auth.backend.deny_all

/api/experimental というエンドポイント名、気概を感じる

参考