この記事のまとめ:
- Apache Airflow のワークフローのタスクを定義するオペレーターの一つである HTTP Operator の使い方を紹介します。
背景
私が Apache Airflow を使用する際、Docker コンテナーで Airflow 自身を立ち上げて使用するのですが、処理させたいタスクの実行が Airflow のコンテナー内で行われることが気持ち悪かったので、処理させたいタスクを別のコンテナーで実行するために、処理用のコンテナーを REST API を持つコンテナーとして立ち上げ、 Airflow からは REST API でそのコンテナーにキックできるようにしようとしました。
HTTP Operator の使い方
Airflow のタスクは Operator と Sensor を使って定義することができ、HTTP Operator は タスクのアクションとして HTTP システムのエンドポイント(もしくは URI )を呼び出すことができます。
公式の GitHub で HTTP Operator のサンプルコードが公開されていますが、本記事では公式のサンプルコードには書いていない使い方を説明していこうと思います。
サンプルコード
サンプルとして用意した DAG の定義ファイルは下記の通りです。
HTTP Operator としてデフォルトで用意されている SimpleHttpOperator クラスを使用します。この Operator を使うにあたって、重要なコンストラクターとして渡しているパラメーターについて見ていきます。
http_conn_id と endpoint
http_conn_id
と endpoint
はセットになっており、この2つでリクエスト先の URI を指定します。
Airflow では、サービスごとのホストアドレスがあらかじめ定義されており、それを利用することで Operator を作成するごとにアクセス先のすべての URI を記載するのでなく、エンドポイントのみで指定しようとしています。つまり、http_conn_id
で定義されるホストアドレス + endpoint
で URI が決まるということです。
SimpleHttpOperator では http_operator.py において、デフォルト値として、http_conn_id =
['](http://http_conn_id: str = 'http_default',)``http_default``'
として設定されています。また、 http_default
は、db.py において、 https://google.com
として定義されていますので、公式の HTTP Operator のサンプルコードではすべて Google の API を使用しているということがわかります。
今回のサンプルコードでは Google 以外の API を使用するために、http_conn_id=``''
とすることで http_conn_id
で決定されるホストアドレスを空文字にし、endpoint
で URI を定義しています。
data と headers
これらは HTTP リクエストのヘッダーと JSON リクエストとして送る JSON データを指定します。なお、 SimpleHttpOperator ではデフォルトで HTTP メソッドとして POST が設定されています。
response_check
Airflow のタスクが無事完了したかどうかの判定を response_check
で行います。判定方法は lambda 式で行うか、定義した関数を呼び出して行うかを選べます。何も指定しない場合、(ソースコード上)無条件でタスクの実行は成功したことになるはずです。
公式のサンプルコードでは lambda 式を使用していますので、本サンプルコードでは response_check_example
という関数を定義して使用しています。
この関数は単純に HTTP レスポンスの JSON ファイルの中身を見て、status
という項目が success
かどうかを判定しています。なお、HTTP レスポンスの JSON ファイルの中身がそのような構成になっている前提です。このように簡単にレスポンスの中身を見た判定が行えます。
log_response
このパラメーターを True
にしておくと Airflow の Web GUI 上でログとして HTTP レスポンスの中身を出力してくれるようになります。
ここまでで HTTP Operator の使い方の概要はわかったのではないでしょうか?
もし、もっと処理を工夫したいとなると、SimpleHttpOperator を使うのではなく、SimpleHttpOperator を継承させて新たなクラスを定義すると良いかもしれませんのでぜひチャレンジしてみてください。
今回は以上です。 最後まで読んでいただき、ありがとうございます。
関連記事
Apache Airflow入門 Apache Airflow初心者がAirflowのアーキテクチャを理解し、チュートリアルを動かすまでの手順をまとめています。
Airflowのユーザー認証の設定方法 Apache AirflowのWeb UIにパスワードユーザー認証の設定方法をまとめています。
コメントを投稿