Airflow doc_md のすゝめ

この記事はZenn.devで書いた記事です。

Summary

Airflow のdag とtask には、doc_mdというものがあるよ。 これを使えばタスクの定義や目的をみんな大好きMarkdown形式で記述することができるよ。 今回は、Airflow==1.10.12を使ってるけど、Airflow<=1.10.1から使えるからGoogle Cloud Composer でも使えるよ。

What's Airflow

Airflow は元々はAirbnb が開発していたタスクスケジューラOSSでした。 今は、Apache コミュニティの一つのOSSとしての位置づけになっています。 また、マネージドサービスとしてはGCP のCloudComposerがあります。

仕組みなどは他の記事などを参照されたいが、簡単に要点を以下にまとめてみました。

  • それぞれのタスクはOperatorと呼ばれる処理を実行するもの、Sensorと呼ばれるなにかのイベントを検知するものがある
  • 上記のインスタンスを利用して、処理をTaskとして実装
  • そのTaskをつなぎ合わせてDAG(有向非巡回グラフ)として表現
  • タスクの依存関係の管理やGUIでのリトライなどのメリットがある

タスクのドキュメントはどう管理するの?

これまでいくつかのプロジェクトに関わり、Airflowが採用されるケースを見てきましたし、 巷の事例紹介でもよく分析基盤のタスクスケジューラとしてAirflowを採用しているケースを聞くことが増えてきました。 その中で個人的にタスクの役割やダグ全体が処理する内容はどうやって管理するのがベストなのだろう?という疑問がありました。

Which is better? Python docstring, Other document tool and doc_md...

Pythonコードの場合、Pythonのdocstringとして管理するというのが一つの回答かもしれません。 しかし、これでは

  • タスクがやっていることはコードを見に行かないといけない
  • AirflowとしてGUIを提供しているのにその旨味を活かせない

このような点が挙げられるかと思います。 また、ドキュメントツールが別途あるからそれを使えばいいじゃん。という意見もあるかもしれません。 ドキュメントツールにまとめることは確かに素晴らしいことです。 しかし、開発途上にあるツールをドキュメントにまとめ更新し続けるのは難しく、だんだんと開発のスピードを言い訳にドキュメントの更新がおろそかになると思います。 そこでAirflowで提供されているdoc_mdを紹介したいと思います。

What's doc_md?

doc_mdMarkdown形式でdagやtaskのノートを記述することができ、記述した内容をGraph ViewTask Instance上に表示することができます。 上記のリンクに飛ぶと気づきますが、doc_md以外にもplain textで記述できる方法やjson形式、yaml形式などお好きな方法で記述する方法があります。 そして、このdoc attributeを使う最大のメリットは、 コードの更新と同時にドキュメントの更新が指摘・確認できる ことにあると思います。 例えば、パイプラインを修正するPRにドキュメントを更新したコミットがない場合はPRで指摘すればよいので、特段開発する側としても、別ツールになっているドキュメントを更新するよりも億劫にならずに済むと思われます。

どうやって書くの?

今回は、GitHubにあげられているairflow-examples/dags/example_python_operator.pyをベースにdoc_mdを追加したいと思います。 追加した内容は以下のような内容になります。

from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

import time
from pprint import pprint

seven_days_ago = datetime.combine(
        datetime.today() - timedelta(7), datetime.min.time())

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id='example_docmd_python_operator', default_args=args,
    schedule_interval=None)

dag.doc_md = """
# What this dag will do?
This dag is for an example workflow using a PythonOperator.

## Do you want to share any details?
First of all, it prints dags' context and ds.<br>
And then, it will run 10 sleeping tasks in parallel.<br>
Note that sleeping time is set randomly, from 0 to 9 seconds.
"""


def my_sleeping_function(random_base):
    '''This is a function that will run within the DAG execution'''
    time.sleep(random_base)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)


run_this.doc_md = """
# What this task will do?
This task simply print the dags' context and ds.

## Do you want to share any details?
Nothing for detail.
"""

for i in range(10):
    '''
    Generating 10 sleeping task, sleeping from 0 to 9 seconds
    respectively
    '''
    task = PythonOperator(
        task_id='sleep_for_'+str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i)/10},
        dag=dag)

    task.doc_md = """
# What this task will do?
This task will sleep after `print_the_context` task.

## Do you want to share any details?
Sleep time are set randomly, from 0 to 9 seconds.
"""

    task.set_upstream(run_this)

どんな感じに描画されるのか?

Dag の場合

Graph ViewTree Viewに遷移すると描画されているのが確認できます。 dag.doc_md に記述された内容が表示されています。 ここに説明を記述することで、task_idだけでは表現できない、リトライのことやDAGの目的などを書くことができます。

dag の描画

Task の場合

dag のときとは異なり、Task の場合は、Task Instance Detailsのページに遷移すると描画されているのが確認できます。 Attribute: doc_mdという枠が表示されているので、その領域にノートの内容が描画されています。

print_the_context task の描画

sleep_for_ task の描画

まとめ

今回はAirflow に付随するdoc attribute を紹介しました。 パイプラインはどうしても煩雑になる傾向があるので、この機能を使って可読性とパイプラインの責務をスッキリまとめ簡潔な開発が維持できるようにしていきたいです。