Inglewood’s Tech Blog

株式会社イングリウッドの開発/技術に関することを紹介

KubernetesクラスターでAirflowを建てる話

イントロ

お疲れ様です。ゾルタンです。
イングリウッドでは現在全社のデータ基盤を構築しています。 「エンジニアが本質作業に集中しやすいように」のコンセプトのもと外部ツールのデータ集約は基本的にTroccoに任せようとしていますが、対応していないツールや方式など自前構築がどうしても必須な部分が存在します。 拡張性も運用性も高い方式としてAirflowをK8sで選択したので、今回のブログポストでは具体的で便利なセットアップ方法を紹介します。

Airflowとは

ワークフローのスケジューリング・実行・モニタリングを行うオープンソースなソフトウェアです。 スケーラビリティ、ダイナミックなワークフロー構築、インテグレーション機能の豊富さが魅力です。 オフィシャルサイト

やりたいこと

GCPKubernetesクラスターをスクラッチから構築し、Airflowを建てる。 AirflowはDAGファイルをGitHubから定期的に引っ張ってくれる機能があるので、GitHubを連携する。実行されるジョブですが、Dockerイメージをビルドし、GCPのコンテナレジストリ(gcr.io)にプッシュし、AirflowはGCRからコンテナイメージを引っ張って実行する仕組みを作る。

必要な環境

  • GCPプロジェクト
  • docker
  • kubectl
  • helm
  • python

手順

  1. GCPK8sクラスターを作る
  2. helmでAirflowをセットアップする
  3. Airflowの権限まわりの設定を行う
  4. シンプルなdockerコンテナを用意する
  5. Airflowにテスト用DAGファイルを追加する

GCPK8sクラスターを作る

TL;DR

gcloud beta container --project "your-project-name" clusters create "your-cluster-name" \
--zone "asia-northeast1-a" --no-enable-basic-auth --cluster-version "1.19.9-gke.1900" \
--release-channel "regular" --machine-type "e2-medium" \
--image-type "COS_CONTAINERD" --disk-type "pd-standard" \
--disk-size "100" --metadata disable-legacy-endpoints=true \
--scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \
--num-nodes "1" --enable-stackdriver-kubernetes \
--enable-ip-alias --network "projects/your-project-name/global/networks/default" \
--subnetwork "projects/your-project-name/regions/asia-northeast1/subnetworks/default" \
--no-enable-intra-node-visibility --default-max-pods-per-node "110" \
--enable-autoscaling --min-nodes "1" --max-nodes "10" \
--no-enable-master-authorized-networks \
--addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver \
--enable-autoupgrade --enable-autorepair \
--max-surge-upgrade 1 --max-unavailable-upgrade 0 \
--enable-shielded-nodes --node-locations "asia-northeast1-a"

説明

上記のコマンドは "asia-northeast1-a" リージョンでe2-medium(2 vCPU, 4GBメモリ)のマシンタイプで、デフォルトノードプールに1ノードから10ノードの間でオートスケーリングするクラスターを作ります。(自分で実行してみたい場合、"your-project-name"と"your-cluster-name"を変えてください。)

追記

GCPのUIから作る際に、デフォルトでは "asia-northeast1-b" リージョンが選択されています。もし作るクラスターにGPUを付けたい場合、1-bリージョンではGPUを提供していないため、最初から1-aにしています。(以前失敗したことがあります)

GUIからクラスターが無事作られたことを確認できます。

f:id:varadiz:20210615163130p:plain

helmでAirflowをセットアップする

クラスターにAirflowをデプロイするとき、helmを使うと便利です。(helmはK8s向けのパッケージマネージャーです。) いくつかのバリエーションが出ていますが、僕の経験ではBitnamiが提供しているチャートが一番早く・簡単にデプロイできます。

Docs: artifacthub.io

コマンドはこちらですが、叩く前にいくつかTODOがあります。

  • GitHubでアクセストークンを作る Repoのリード権限があれば充分です。
  • Fernetキーを作る Generate a Fernet key for Airflow
  • DAGファイル専用のGitHubレポを作る レポには空の「dags」フォルダを用意してあげて下さい。

Airflowの誕生

準備が整ったら、下記のコマンドで一発です。

helm install --create-namespace airflow --namespace airflow \
--set airflow.auth.username=YOUR_USERNAME \
--set airflow.auth.password=YOUR_PASSWORD \
--set airflow.auth.forcePassword=true \
--set airflow.auth.fernetKey=YOUR_FERNET_KEY \
--set airflow.cloneDagFilesFromGit.enabled=true \
--set airflow.cloneDagFilesFromGit.repository=https://[USER]:[TOKEN]@github.com/[GITHUB_USER]/[REPO] \
--set airflow.cloneDagFilesFromGit.branch=master \
--set airflow.cloneDagFilesFromGit.interval=60 \
--set airflow.baseUrl=http://127.0.0.1:8080 \
bitnami/airflow

GCPGUIでも確認できます。

f:id:varadiz:20210615164408p:plain

注意

ちゃんと立ち上がるまで時間が掛かります。「Unschedulable」などのステータスだったり、とても心配になりますが、数分経つとステータスが全部OKになります。逆に5分後もステータスがおかしかったら、Podのログを確認して、トラブルシューティングしてみて下さい。

追記

  • ネームスペースをAirflowにしています。好きな値を付けてください。
  • ワークロード、サービス、ストレージ、シークレットなど色々が作れます。もしhelm upgrade, helm purgeなどで変えたい・やり直したい場合、ストレージの方は一度削除した方がいいです。(Airflowのバックエンドが使っているPostgresデータがストレージに保存されていて、やり直したい場合はストレージを削除しないといろいろと謎のエラーが出始めるので要注意です。)

Airflowが立ち上がったら、ターミナルでkubectlのport-forwardingを使ってブラウザーlocalhost:8080からAirflowがみれます。

kubectl port-forward --namespace airflow svc/airflow 8080:8080

Airflowの権限まわりの設定を行う

AirflowからGCRにアクセスする必要があるので、これから権限まわりの設定を行う。 まずはGCPのIAMタブからサービスアカウントを作り、JSONキーをダウンロードします。サービスアカウントに権限を与えすぎないように心がけましょう。

次、ダウンロードしたJSONを使って、「gcr-json-key」という名前でK8sのシークレットを作ります。

kubectl create secret docker-registry gcr-json-key \
--docker-server=https://gcr.io \
--docker-username=_json_key \
--docker-password="$(cat ./YOUR_SERVICE_ACCOUNT.json)"

Dockerイメージを引っ張るときはgcr-json-keyを使うようにアップデート掛けます。

kubectl patch serviceaccount default -p '{"imagePullSecrets": [{"name": "gcr-json-key"}]}'

シンプルなdockerコンテナを用意する

テストということで、Nodeのシンプルなコンテナを用意します。

package.json

{
  "name": "test",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
 "author": "",
 "license": "ISC"
}

index.js

console.log('Hello', 'World')

Dockerfile

FROM node:12-alpine
WORKDIR /app
COPY package*.json ./
RUN npm i
COPY . ./
CMD node index.js

Dockerイメージをビルドし、GCRにプッシュします。

docker build -t gcr.io/YOUR_PROJECT/test .
docker push gcr.io/YOUR_PROJECT/test

AirflowにテストDAGを追加

GitHubレポのdagsフォルダに下記の内容でファイルを入れます。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=60),
}

dag = DAG('test-dag',
          default_args=default_args,
          start_date=datetime(2021, 6, 15, 4, 0),
          description='DAG for testing if airflow works properly',
          schedule_interval="*/10 * * * *",
          tags=["test", "ステージング"])

config = {}

test_task = KubernetesPodOperator(namespace='airflow',
                                  image="gcr.io/ig-common/test:latest",
                                  image_pull_secrets='gcr-json-key',
                                  cmds=["node", "index.js"],
                                  arguments=[""],
                                  is_delete_operator_pod=True,
                                  in_cluster=True,
                                  secrets=[],
                                  env_vars=config,
                                  name="test-task",
                                  task_id="test-task",
                                  get_logs=True,
                                  dag=dag)

test_task

test_taskのところで、GCRに以前プッシュしたイメージを使うように指定しています。 イメージを引っ張る際に、以前作ったシークレット「gcr-json-key」が使われます。 AirflowがDockerイメージをGCRから引っ張って、実行後にPodを削除してくれます。

できたDAGがAirflowでみれます。 f:id:varadiz:20210615173828p:plain

まとめ

AirflowをK8sで建てました。DAGファイルはGitHubから引っ張られる仕組みで、ソースコード管理が非常に簡単です。 AirflowがGCRにあるDockerイメージを引っ張ってきて、コンテナを実行してくれます。 データパイプラインに求めていることを全部クリアして、K8s・Airflow・Docker・gcr.io・GitHubの便利な組み合わせだと思いますので、是非実装してみて下さい。