aws-glue-libs をCI/CD で使いたい

概要

AWS Glue の自動テストで aws-glue-libs コンテナイメージを使いたいが high-uid-error になってしまった。それを回避した話。

前提

AWS Glue の開発を進めやすくするために、amazon から aws-glue-libs というコンテナが公開されている。

https://hub.docker.com/r/amazon/aws-glue-libs

このコンテナを使って、

  • スクリプトの開発
  • Zeppelin, Jupyter などによるNotebook 開発
  • テストの実行・開発

を行うことができる。

今回の事象

今回はglue とpySpark の組み合わせでETL タスクを実装しており、一部のtransform をpytest で書いていた。 local 環境ではdocker-compose でテストを走らせるようにしていて、base image に aws-glue-libs を使っているという形。 local 環境ではうまくテストが実行されたので、CircleCI で自動テストを行おうとした際に、 high-uid-error にぶつかった。

https://circleci.com/docs/2.0/high-uid-error/

問題

high-uid-error は、コンテナ内のファイルやディレクトリのUID/GID の値が指定の値域以上の値が指定されているケースに起こる。 ドキュメントには

The error is caused by a userns remapping failure. CircleCI runs Docker containers with userns enabled in order to securely run customers’ containers. The host machine is configured with a valid UID/GID for remapping. This UID/GID must be in the range of 0 - 65535.

とあり、コンテナ内のUID/GID は 0 ~ 65535 の範囲内にある必要がある。

原因

UID/GID が原因のようなので、コンテナを起動して ls -la などでファイルなどのオーナーを調べる。

# local 環境から amazon/aws-glue-libs:glue_libs_1.0.0_image_01 を起動する
$ docker run -it amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /bin/bash 

# amazon/aws-glue-libs:glue_libs_1.0.0_image_01 内の操作
root@b3219a5e7e66:/# ls -la /home/
total 32
drwxr-xr-x  1 root       root  4096 Jul 21  2020 .
drwxr-xr-x  1 root       root  4096 Jul  5 05:13 ..
drwxr-xr-x  3 root       root  4096 Jul 15  2020 aws
drwxr-xr-x  5 root       root  4096 Jul 21  2020 aws-glue-libs
drwxr-xr-x  3 root       root  4096 Jul 21  2020 jupyter
drwxr-xr-x 25 root       root  4096 Jul 21  2020 livy
drwxr-xr-x 11 2049080342 staff 4096 Sep 17  2019 spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8
drwxr-xr-x 13 root       root  4096 Jul 21  2020 zeppelin

/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8 のuid が 2049080342 になっていることが確認できた。

うまくいった対処

今回はCircleCI の代わりにGitHub Actions を使うことでクリアしました。 幸いにもテストを実行するdocker-compose file を用意していたので、そちらをGitHub Actions の中で呼ぶだけで済みました。

他に試みた対処

CircleCI の jobs の中で amazon/aws-glue-libs:glue_libs_1.0.0_image_01image として指定し、 command/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8 のオーナーを root:root に変更してみた。 これはうまくいかなかった。 そもそも、CircleCI で該当のコンテナをマッピングするときにエラーが起きるので、コマンドまで到達せず、UID/GID を変更するまでに至らなかった。

Docker image を公開しました

CircleCI でも使えるように amazon/aws-glue-libs の Spark を root:root に変更しただけのDocker image とGitHub repository を公開しました。 興味がある人は使ってみてください!

Docker Hub

hub.docker.com

GitHub repository

github.com

Redshift で日付区間を縦持ちにする

今となっては横持ちのデータを縦持ちに展開する場合、もしくは縦持ちのデータを横持ちに変換する場合、
Airflow なりDataflow なりデータパイプラインで実現すると思います。
今回は検証をしたいためだけのため、横持ちのデータを縦持ちに展開することをSQLだけで頑張らないといけない機会があったので、
記録として残しておきます。
いつもどおり、誰かの参考、助けになれば嬉しいです。

以前にもHive でデータを縦持ちに展開するとかの記事を書いたのが懐かしい…

qiita.com

対象のデータとどう展開したいか

さて、本題に入っていきます。
RDBのデータとして以下のような期間を表現しているデータがあったとします。

SELECT
    'hoge' AS name,
    '2021-02-01'::DATE AS start_date,
    '2021-02-15'::DATE AS end_date

テーブルで表現するとこんな感じです。

name start_date end_date
hoge 2021-02-01 2021-02-15

start_date が開始の日時、 end_date が終了の日時を表現してます。

以下のように name カラムを start_date から end_date まで日を更新しながら一行ずつに展開していくことがゴールです。

name expand_date
hoge 2021-02-01 00:00:00
hoge 2021-02-02 00:00:00
hoge 2021-02-03 00:00:00
... ...
hoge 2021-02-13 00:00:00
hoge 2021-02-14 00:00:00
hoge 2021-02-15 00:00:00

どうやったか

SQL の cross join と Redshift の関数の generate_series , dateadd 関数を組み合わせて実現しました。

generate_series

generate_series 指定した区間とインターバルで連続的に数字を生成する関数です。
Redshift ではgenerate_series は使えないという記事をよくでてきますが、from 句で生成するケースでは使うことができます。

SELECT
  *
FROM
  generate_series(1, 4, 1);

このようなSQL を実行すると

generate_series
1
2
3
4

という結果が返ってきます。 関数としては、

generate_serise( start, end, interval )

で指定できて、start からend までinterval の間隔で閉区間の数列を生成してくれます。

CROSS JOIN

cross join は簡単に言えば2つのテーブルの行を全通り組み合わせるものです。
cross join で調べると資料がたくさん出てくるが、以下のQiita の記事がわかりやすいと思う。

qiita.com

dateadd

dateadd は複数のSQL で追加されている関数です。
Redshift の関数の仕様は以下のドキュメントを一読してください。

docs.aws.amazon.com

大まかに関数を説明すると、

DATEADD( datepart, interval, {date|time|timetz|timestamp} )

という関数で各引数は

  • datepart: year , month , day など加算したい部分を指定
  • interval: 加算数
  • {date|time|timetz|timestamp}: date や timestamp データ型のカラム

を表しています。

組み合わせていく

ではまず最初に最初の1行しかないテーブルとgenerate_serise 関数で生成したテーブルをcross join で組み合わせていきます。

WITH tmp_table_1 AS (
  SELECT
    'hoge' AS name,
    '2021-02-01'::DATE AS start_date,
    '2021-02-15'::DATE AS end_date
)
SELECT
  *
FROM
  tmp_table_1
CROSS JOIN (
  SELECT
    *
  FROM
    generate_series(1, 30, 1)
) tmp_table_2;

これにより

name start_date end_date
hoge 2021-02-01 2021-02-15

を30 行生成します。 生成したテーブルの部分抜粋は以下の通りです。

name start_date end_date generate_series
hoge 2021-02-01 2021-02-15 1
hoge 2021-02-01 2021-02-15 2
hoge 2021-02-01 2021-02-15 3
... ... ... ...
hoge 2021-02-01 2021-02-15 28
hoge 2021-02-01 2021-02-15 29
hoge 2021-02-01 2021-02-15 30

次にこの生成した generate_series の数字を使い、 dateadd していきます。
SQL としては以下の通りです。

WITH tmp_table_1 AS (
  SELECT
    'hoge' AS name,
    '2021-02-01'::DATE AS start_date,
    '2021-02-15'::DATE AS end_date
)
SELECT
  tmp_table_1.name,
  tmp_table_1.start_date,
  tmp_table_1.end_date,
  tmp_table_2.generate_series,
  dateadd(day,tmp_table_2.generate_series,tmp_table_1.start_date-1) AS expand_date
FROM
  tmp_table_1
CROSS JOIN (
  SELECT
    *
  FROM
    generate_series(1, 30, 1)
) tmp_table_2;

生成されたテーブルは

name start_date end_date generate_series expand_date
hoge 2021-02-01 2021-02-15 1 2021-02-01 00:00:00
hoge 2021-02-01 2021-02-15 2 2021-02-02 00:00:00
hoge 2021-02-01 2021-02-15 3 2021-02-03 00:00:00
... ... ... ...
hoge 2021-02-01 2021-02-15 28 2021-02-28 00:00:00
hoge 2021-02-01 2021-02-15 29 2021-03-01 00:00:00
hoge 2021-02-01 2021-02-15 30 2021-03-02 00:00:00

のようになり、うまく日付が生成されていることがわかります。
しかし end_date 期間以上の日付が生成されています。
なので、最後に end_date で期間を絞りつつ、カラムも絞って目的のテーブルに整形していきます。
今回は nameexpand_date に制限します。

SQLとしては以下の通りです。

WITH tmp_table_1 AS (
  SELECT
    'hoge' AS name,
    '2021-02-01'::DATE AS start_date,
    '2021-02-15'::DATE AS end_date
)
SELECT
  tmp_table_1.name,
  dateadd(day,tmp_table_2.generate_series,tmp_table_1.start_date-1) AS expand_date
FROM
  tmp_table_1
CROSS JOIN (
  SELECT
    *
  FROM
    generate_series(1, 30, 1)
) tmp_table_2
WHERE expand_date <= end_date;

そして生成されたテーブルは以下の通りです。

name expand_date
hoge 2021-02-01 00:00:00
hoge 2021-02-02 00:00:00
hoge 2021-02-03 00:00:00
... ...
hoge 2021-02-13 00:00:00
hoge 2021-02-14 00:00:00
hoge 2021-02-15 00:00:00

目的の縦持ちに展開したテーブルを生成することができました。
expand_date のフォーマットを変えたい場合は、 to_char 関数とかを使えばフォーマットできます。

ネックになっているところ

イケてないところとしては、生成する行数が start_dateend_date とで一致していないところだと思っています。
必要以上のレコードを生成してフィルターしているのをもう少しスマートにできればなという感触です。

もっとスマートなやり方をご存知の方がいればコメントをお願いします…!

所感

一応SQLだけで目的のテーブルを作成することができました。
序文に書いた通り、本来はデータパイプラインなどで前処理を実装するほうが確実に実装できると思います。
ですが、このようなやり方もあるということを発見できただけでも良かったかなと思います。

parquet の中身を見る @mac

久しぶりにparquet file を扱う機会があり、中身を見る必要があったので、やり方を残しておく。

$ brew install parquet-tools

以前のブログでツールはなるべくビルドしたいと書いたが、今回は brew でインストールした。

Java の環境などはセットアップしていなかったので、brew install のときに出力されたパスも合わせて設定した。

ぼくはzsh を使っているので、今回は .zshrc に出力した。

# jdk のpath をzshrc に設定
echo 'export PATH="/usr/local/opt/openjdk/bin:$PATH"' >> ~/.zshrc
echo 'export CPPFLAGS="-I/usr/local/opt/openjdk/include"' >> ~/.zshrc

その後は、reload すると parquet-tools が使えるようになった。

Airflow doc_md のすゝめ

この記事はZenn.devで書いた記事です。

Summary

Airflow のdag とtask には、doc_mdというものがあるよ。 これを使えばタスクの定義や目的をみんな大好きMarkdown形式で記述することができるよ。 今回は、Airflow==1.10.12を使ってるけど、Airflow<=1.10.1から使えるからGoogle Cloud Composer でも使えるよ。

What's Airflow

Airflow は元々はAirbnb が開発していたタスクスケジューラOSSでした。 今は、Apache コミュニティの一つのOSSとしての位置づけになっています。 また、マネージドサービスとしてはGCP のCloudComposerがあります。

仕組みなどは他の記事などを参照されたいが、簡単に要点を以下にまとめてみました。

  • それぞれのタスクはOperatorと呼ばれる処理を実行するもの、Sensorと呼ばれるなにかのイベントを検知するものがある
  • 上記のインスタンスを利用して、処理をTaskとして実装
  • そのTaskをつなぎ合わせてDAG(有向非巡回グラフ)として表現
  • タスクの依存関係の管理やGUIでのリトライなどのメリットがある

タスクのドキュメントはどう管理するの?

これまでいくつかのプロジェクトに関わり、Airflowが採用されるケースを見てきましたし、 巷の事例紹介でもよく分析基盤のタスクスケジューラとしてAirflowを採用しているケースを聞くことが増えてきました。 その中で個人的にタスクの役割やダグ全体が処理する内容はどうやって管理するのがベストなのだろう?という疑問がありました。

Which is better? Python docstring, Other document tool and doc_md...

Pythonコードの場合、Pythonのdocstringとして管理するというのが一つの回答かもしれません。 しかし、これでは

  • タスクがやっていることはコードを見に行かないといけない
  • AirflowとしてGUIを提供しているのにその旨味を活かせない

このような点が挙げられるかと思います。 また、ドキュメントツールが別途あるからそれを使えばいいじゃん。という意見もあるかもしれません。 ドキュメントツールにまとめることは確かに素晴らしいことです。 しかし、開発途上にあるツールをドキュメントにまとめ更新し続けるのは難しく、だんだんと開発のスピードを言い訳にドキュメントの更新がおろそかになると思います。 そこでAirflowで提供されているdoc_mdを紹介したいと思います。

What's doc_md?

doc_mdMarkdown形式でdagやtaskのノートを記述することができ、記述した内容をGraph ViewTask Instance上に表示することができます。 上記のリンクに飛ぶと気づきますが、doc_md以外にもplain textで記述できる方法やjson形式、yaml形式などお好きな方法で記述する方法があります。 そして、このdoc attributeを使う最大のメリットは、 コードの更新と同時にドキュメントの更新が指摘・確認できる ことにあると思います。 例えば、パイプラインを修正するPRにドキュメントを更新したコミットがない場合はPRで指摘すればよいので、特段開発する側としても、別ツールになっているドキュメントを更新するよりも億劫にならずに済むと思われます。

どうやって書くの?

今回は、GitHubにあげられているairflow-examples/dags/example_python_operator.pyをベースにdoc_mdを追加したいと思います。 追加した内容は以下のような内容になります。

from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

import time
from pprint import pprint

seven_days_ago = datetime.combine(
        datetime.today() - timedelta(7), datetime.min.time())

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id='example_docmd_python_operator', default_args=args,
    schedule_interval=None)

dag.doc_md = """
# What this dag will do?
This dag is for an example workflow using a PythonOperator.

## Do you want to share any details?
First of all, it prints dags' context and ds.<br>
And then, it will run 10 sleeping tasks in parallel.<br>
Note that sleeping time is set randomly, from 0 to 9 seconds.
"""


def my_sleeping_function(random_base):
    '''This is a function that will run within the DAG execution'''
    time.sleep(random_base)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)


run_this.doc_md = """
# What this task will do?
This task simply print the dags' context and ds.

## Do you want to share any details?
Nothing for detail.
"""

for i in range(10):
    '''
    Generating 10 sleeping task, sleeping from 0 to 9 seconds
    respectively
    '''
    task = PythonOperator(
        task_id='sleep_for_'+str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i)/10},
        dag=dag)

    task.doc_md = """
# What this task will do?
This task will sleep after `print_the_context` task.

## Do you want to share any details?
Sleep time are set randomly, from 0 to 9 seconds.
"""

    task.set_upstream(run_this)

どんな感じに描画されるのか?

Dag の場合

Graph ViewTree Viewに遷移すると描画されているのが確認できます。 dag.doc_md に記述された内容が表示されています。 ここに説明を記述することで、task_idだけでは表現できない、リトライのことやDAGの目的などを書くことができます。

dag の描画

Task の場合

dag のときとは異なり、Task の場合は、Task Instance Detailsのページに遷移すると描画されているのが確認できます。 Attribute: doc_mdという枠が表示されているので、その領域にノートの内容が描画されています。

print_the_context task の描画

sleep_for_ task の描画

まとめ

今回はAirflow に付随するdoc attribute を紹介しました。 パイプラインはどうしても煩雑になる傾向があるので、この機能を使って可読性とパイプラインの責務をスッキリまとめ簡潔な開発が維持できるようにしていきたいです。

Rust インストール

Rust をインストールして、色々合わせてインストールされたので、それぞれの役割を書いていきます。

Rust インストール

先日の記事でもRust のインストールを紹介したが、rust-lang.org にインストール方法を説明するページがあるので、そちらを参照されたい。

toohsk.hateblo.jp

www.rust-lang.org

今回は以下のコマンドでインストールした。

$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

そうするとインストールの方法に関して、以下のような出力がだされる。

1) Proceed with installation (default)
2) Customize installation
3) Cancel installation
>

そのままインストールを進める場合は1、インストールをカスタマイズする場合は2、インストールをキャンセルしたい場合は3といったところか?
今回は1でそのままインストールを進める。

そうすると、基本的にインストールが完了される。
bash なり zsh なりのprofile ファイルにパスが自動的に設定されます。
source コマンドでprofile をリロードします。

$ source ~/.zshrc

合わせてインストールされたツールたち

Rust をインストールすると2つのツールが合わせてインストールされます。

  • rustup
  • cargo

の2つです。

何が違うの??

rustupcargo の役割の違いをメモして起きます。

Rustup: the Rust installer and version management tool

rustup の役割は、Rust のインストールページにありますが、Rustそのものを管理するmanagement toolです。
Rust の魅力はいくつかあると思いますが、その中の一つに更新の感覚とその頻度があるのではないでしょうか?
Rust が更新され、update したい場合は

$ rustup update

で行うみたいです。(まだやったことないですが笑)

Cargo: the Rust build tool and package manager

一方、 cargo の役割は、自分で書いたコードのbuildや実行、あるいは他者が書いたtool(Rust の世界では crates と呼ぶみたいです。)のインストールに使うみたいです。
コマンドとしては、

$ cargo build    # build the source code
$ cargo run      # run the source code
$ cargo test     # run the test code
$ cargo install  # install published crates

があるみたいです。

とりあえず、今日はインストールのときに気になったことをブログにしてみました。
どなたかの助けになれば幸いです。

ターミナルの環境改善活動をしてみた

先日、同僚とモブプログラミングをしていてrgコマンドを使ってgrepに近いことをしていてとても便利そうだったので、導入しようとおもったので、記事にしました。

巷を賑わすRust 製のコマンドたち

最近、自分のターミナルを豊かにするアンテナを貼っていなかったのであまり知らなかったのですが、Rust 製のコマンドが流行ってるみたいですね。
今は、仕事の都合上Python を触ることが多いのですが、違う言語にも触れたいと思っていたので、少しRust に興味を持ったので、少しそういう記事を増やしても行くかもしれません。
ひとまず、今回の主題に入っていこうと思います。

コマンドの導入方法

今回インストールする環境は、macOS Catalina になります。
まあMac といえばこういうツールに関してはbrewでインストールするというのもありなのですが、個人的に開発で使う環境ということもあり、
何をbrew でいれたか、何をコードからビルドしたかわからなくなるのがいやなので、
最近はbrew から入れないようにしています。
なので、Rust 製のコマンドに関してはコードからのビルド or cargoでbuild のどちらかで進めようと思います。

Rust 環境を整備する

今後Rust を勉強しようとおもったのでRust の環境を今回は導入していこうと思います。
rust-lang.org にインストール方法を説明するページがあります。
コマンド一撃でセットアップできます。

ripgrep をインストールする

ripgrep のリポジトリは v こちら。

github.com

インストールの方法はREADMEにかかれており、各環境での手軽なインストールする方法が載ってますので、環境や自分のポリシーに合わせてインストールしてください。

コードからビルドする

Rust programmer 向けにcargo を使いインストールする方法とコードからビルドする方法が提案されています。

cargo 経由でインストールする

cargo を使ってインストールする場合は、

$ cargo install ripgrep

とすれば良いみたいです。
ただ、注意点としてはRust のバージョンが1.34.0 以降である必要があるとのことです。

コードからビルドする

コードからビルドするには、git clone でコードを引っ張ってくるところからスタートします。

$ git clone https://github.com/BurntSushi/ripgrep
$ cd ripgrep
$ cargo build --release

cargo build とすると必要なcrates をdownload し、compile してくれます。
crates とはRust の世界でpackage を指す言葉のようです。
build 自体は多少時間がかかりましたが、無事成功しましたので、実行できるか確認してみます。

$ ./target/release/rg --version
ripgrep 12.1.1 (rev 11c7b2ae17)
-SIMD -AVX (compiled)
+SIMD +AVX (runtime)

成功しているみたいです。
ただ、コードからビルドした場合は、./target 以下にバイナリが作られるだけなので、パスを通す必要があります。
今回、自分は/usr/local/binシンボリックリンクを貼るようにしました。

$ rg --version
ripgrep 12.1.1 (rev 11c7b2ae17)
-SIMD -AVX (compiled)
+SIMD +AVX (runtime)

コマンドとして呼び出すことができました!

プライバシーポリシーを設定しました

こんにちは管理人のtoohskです。
下記、「プライバシーポリシー」に関して記載致しましたので、ご一読願います。

当サイトへのコメントについて 免責事項 プライバシーポリシーの変更について

当サイトに掲載されている広告について

当サイトでは、第三者配信の広告サービス(Googleアドセンスもしもアフィリエイト)を利用しています。
このような広告配信事業者は、ユーザーの興味に応じた商品やサービスの広告を表示するため、当サイトや他サイトへのアクセスに関する情報 『Cookie』(氏名、住所、メール アドレス、電話番号は含まれません) を使用することがあります。 またGoogleアドセンスに関して、このプロセスの詳細やこのような情報が広告配信事業者に使用されないようにする方法については、こちらをクリックしてください。

当サイトが使用しているアクセス解析ツールについて

当サイトでは、Googleによるアクセス解析ツール「Googleアナリティクス」を利用しています。
このGoogleアナリティクスはトラフィックデータの収集のためにCookieを使用しています。このトラフィックデータは匿名で収集されており、個人を特定するものではありません。

この機能はCookieを無効にすることで収集を拒否することが出来ますので、お使いのブラウザの設定をご確認ください。
この規約に関して、詳しくはこちら、またはこちらをクリックしてください。

当サイトへのコメントについて

当サイトでは、スパム・荒らしへの対応として、コメントの際に使用されたIPアドレスを記録しています。
これはブログの標準機能としてサポートされている機能で、スパム・荒らしへの対応以外にこのIPアドレスを使用することはありません。

また、メールアドレスとURLの入力に関しては、任意となっております。 全てのコメントは管理人であるtoohskが事前にその内容を確認し、承認した上での掲載となりますことをあらかじめご了承下さい。

加えて、次の各号に掲げる内容を含むコメントは管理人の裁量によって承認せず、削除する事があります。

  • 特定の自然人または法人を誹謗し、中傷するもの。
  • 極度にわいせつな内容を含むもの。
  • 禁制品の取引に関するものや、他者を害する行為の依頼など、法律によって禁止されている物品、行為の依頼や斡旋などに関するもの。
  • その他、公序良俗に反し、または管理人によって承認すべきでないと認められるもの。

免責事項

当サイトで掲載している画像の著作権・肖像権等は各権利所有者に帰属致します。
権利を侵害する目的ではございません。記事の内容や掲載画像等に問題がございましたら、各権利所有者様本人が直接メールでご連絡下さい。確認後、対応させて頂きます。

当サイトからリンクやバナーなどによって他のサイトに移動された場合、移動先サイトで提供される情報、サービス等について一切の責任を負いません。

当サイトのコンテンツ・情報につきまして、可能な限り正確な情報を掲載するよう努めておりますが、誤情報が入り込んだり、情報が古くなっていることもございます。

当サイトに掲載された内容によって生じた損害等の一切の責任を負いかねますのでご了承ください。

プライバシーポリシーの変更について

当サイトは、個人情報に関して適用される日本の法令を遵守するとともに、本ポリシーの内容を適宜見直しその改善に努めます。

修正された最新のプライバシーポリシーは常に本ページにて開示されます。

運営者:toohsk