Skip to content

KNADA Airflow

Apache Airflow er et verktøy for å orkestrere, skedulere og monitorere datapipelines. Web-grensesnittet til Airflow gir brukeren enkel tilgang til å lese logger fra de ulike stegene i pipelinen, trigge datapipelines manuelt og sjekke statistikk på tidligere kjøringer.

En datapipeline i Airflow, eller DAG (Directed Acyclic Graph), er et sett med oppgaver man ønsker å kjøre som beskriver rekkefølge og avhengigheter mellom oppgavene. Disse DAG-ene beskrives programmatisk i python filer og legges i et Github repo som periodisk synkroniseres med Airflow instansen. Nedenfor ser du en en grafisk representasjon av flyten i en DAG:

flowchart LR
    A(email_start) --> C(waiting_1)
    B(slack_start) --> C(waiting_1)
    C --> D(fetch_styrk)
    C --> E(fetch_nace)
    C --> F(fetch_pam)
    D --> G(waiting_2)
    E --> G(waiting_2)
    F --> G(waiting_2)
    G --> H(transform_styrk)
    G --> I(transform_nace)
    G --> J(transform_pam)
    H --> K(waiting_3)
    I --> K(waiting_3)
    J --> K(waiting_3)
    K --> L(slack_success)
    K --> M(email_success)

Kom i gang

NADA tilbyr team eller enkeltpersoner å sette opp Airflow instanser i KNADA gjennom Knorten.

For mer informasjon om Airflow, se Airflow docs

Vær oppmerksom på at alt av logger fra en Airflow task vil skrives til en bucket i knada-gcp prosjektet, og være tilgjengelig etterpå for Airflow og direkte for NADA som er admins i knada-gcp. Vær derfor forsiktig så ikke sensitiv informasjon skrives til stdout i koden som kjøres.

Github repo for DAGs

For å bruke Airflow i KNADA kreves det et Git-repo under navikt organisasjonen på Github.com som inneholder Python-filer med DAGer.

Hvert minutt vil Github repoet bli synkronisert til Airflow instansen, og Airflow vil lese DAGene definert i repoet.

Eksempler på DAGs repoer

Dataverk Airflow

Vi har laget et bibliotek med flere enkle operators for å lette jobben når man lager DAGer. Dette ligger ute på PyPi.org og er dokumentert der.

Foreløpig har vi fire operators, hvor alle støtter å klone et annet repo ved oppstart av en task, og installerer eksterne Python-avhengigheter via en requirements.txt fil i ditt repo.

  • Quarto operator: Forenkler jobben med å lage datafortellinger
  • Notebook operator: Lar deg kjøre en Jupyter notebook i Airflow
  • Python operator: Lar deg kjøre vilkårlig Python-script
  • Kubernetes operator: Vår base, som er en forenkling av den offisielle Kubernetes operator.

Konfigurasjon av Airflow

I KNADA er Airflow konfigurert til å bruke Kubernetes Executor. Dette innebærer at hver task i en Airflow DAG vil spinne opp og kjøre en egen worker i en separat Kubernetes pod. Det gjør at man står fritt til å selv spesifisere miljøet til Airflow-workeren.

Nedenfor har vi listet opp noen av de konfigurasjonen vi tror er nyttig å vite om.

Merk: Hovedcontaineren som worker-poden bruker vil alltid hete base, så dersom en ønsker å overskrive noe som gjelder spesifikt for denne containeren må man referere til den med navn som i eksemplene under.

Ressursbehov for Airflow

Dersom en ikke spesifiserer ressursbehov for Airflow taskene sine vil de kjøre med standard instillinger som er 512 MB minne, 0.5 vCPU, og 1Gi disk (ephemeral-storage). Dette kan man enkelt overstyre for alle operators gitt at de tar utgangspunkt i BaseOperator til Airflow. Under følger et eksempel på hvordan ressursbehov for en task endres til 2GB minne,2 vCPU, og 5Gi disk:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from kubernetes import client as k8s

def myfunc():
    print("kjør task")

with DAG('min-dag', start_date=days_ago(1), schedule_interval=None) as dag:
    run_this = PythonOperator(
        task_id='test',
        python_callable=myfunc,
        executor_config={
           "pod_override": k8s.V1Pod(
               spec=k8s.V1PodSpec(
                   containers=[
                      k8s.V1Container(
                         name="base",
                         resources={
                           "requests": {
                               "cpu": "2",
                               "memory": "2Gi",
                               "ephemeral-storage": "5Gi"
                           }
                         }
                      )
                   ]
               )
           )
        },
        dag=dag
    )

Trafikk ut fra Airflow (aka allow list)

For å skallsikre Airflow har man muligheten til å skru på allow list for teamets tjenester. Dette innebærer for Airflow at man i hvert task må spesifisere hvilke eksterne tjenester (les: tjenester utenfor Airflow) man skal snakke med. Vi har stengt av muligheten for eksterne tjenester å snakke inn til Airflow.

I podene hvor Airflow tasken kjører blokkeres i utgangspunktet all trafikk ut, med følgende unntak:

  • github.com: Airflow vil alltid trenge å hente repoet som koden ligger i
  • storage.googleapis.com: Airflow skriver loggene til en Google Cloud Storage bucket
  • Knaudit: tjeneste som logger hvilke jobber som kjøres for hvert team til Oracle for DVH

Utover dette er man nødt til å eksplisitt spesifisere hvilke kilder man trenger å snakke med (gjelder både interne og eksterne addresser) for hver enkelt task i en DAG. Dette gjør man ved å legge på en allowlist annotasjon på pod ressursen med hostnavnet og porten på det man trenger å nå.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from kubernetes import client as k8s
import os
import logging

def myfunc():
    import requests
    res = requests.get("https://ssb.no/api")
    res.raise_for_status()

with DAG('min-dag', start_date=days_ago(1), schedule_interval=None) as dag:
    slack = SlackWebhookOperator(
        http_conn_id=None,
        task_id="slack-message",
        webhook_token=os.environ["SLACK_TOKEN"],
        message="start min-dag",
        channel="#minkanal",
        link_names=True,
        executor_config={
            "pod_override": k8s.V1Pod(
                metadata=k8s.V1ObjectMeta(annotations={"allowlist": "hooks.slack.com"})
            )
        }
    )

    run_this = PythonOperator(
        task_id='test',
        python_callable=myfunc,
        executor_config={
            "pod_override": k8s.V1Pod(
                metadata=k8s.V1ObjectMeta(annotations={"allowlist": "ssb.no,db.adeo.no:1521"})
            )
        },
    dag=dag)
    
    slack >> run_this

allowlist er en kommaseparert liste med hostnavn og port på formatet hostnavn:port. Dersom man ikke angir port vil vi bruke 443 som standardport. Når jobben er ferdig vil tilgangene bli fjernet.

Allow list ved bruk av Dataverk Airflow

Man kan også sette allowlist for operators som lages med dataverk-airflow som i eksempelet under.

from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import notebook_operator

with DAG('dag', start_date=days_ago(1), schedule_interval=None) as dag:
    task = notebook_operator(dag=dag,
                             name="knada-pod-operator",
                             repo="navikt/repo",
                             nb_path="notebooks/mynb.ipynb",
                             allowlist=["ssb.no", "db.adeo.no:1521"])

Egne Docker images for Airflow

Hvis du kun har behov for andre Python-biblioteker så anbefaler vi på det sterkeste at du bruker Dataverk Airflow og sender med en requirements.txt fil i stedet for å bygge ditt eget image.

I noen tilfeller har du kanskje flere avhengigheter enn det vi tilbyr i standard Airflow-oppsett. Da kan det å bygge sitt eget Docker image være en løsning. Du kan se hva vi tilbyr i våre images, og hvordan disse er bygget i navikt/knada-images. Våre Docker imager kommer med drivere for Oracle og Postgres, men inneholder ikke et stort utvalg av Python biblioteker.

Se her for å spesifisere eget image som brukes av standard Airflow workere. Se her for å spesifisere eget image som brukes av dataverk-airflow workere.

Har du behov for at hele Airflow instansen skal bruke ditt Docker image så spesifiseres det i Knorten.

NB: Hvis du bygger image lokalt på en nyere Mac så er det viktig at du bygger imaget for riktig plattform. Legg til --platform linux/amd64 i docker build kommandoen.

Vi tillater ikke at Airflow worker containere kjører med root privilegier. Dersom du bygger ditt eget image må dette imaget ha en bruker airflow med uid 50000.

Overstyring av default worker image

Skal man bygge eget image som skal overstyre standard worker imaget er man nødt til å ha apache-airflow installert. Dette vil man få dersom man tar utgangspunkt i enten vårt base image eller det offisielle airflow imaget. I tillegg vil også nødvendig airflow bruker med uid 50000 være opprettet i miljøet. Dersom man ikke tar utgangspunkt i et av disse imagene må man selv installere Apache Airflow i imaget samt opprette airflow-brukeren.

Under følger et eksempel på hvordan å overstyre imaget som Airflow worker containeren bruker med imaget du selv har bygget:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from kubernetes import client as k8s

def myfunc():
    print("kjør task")

with DAG('min-dag', start_date=days_ago(1), schedule_interval=None) as dag:
    run_this = PythonOperator(
        task_id='test',
        python_callable=myfunc,
        executor_config={
           "pod_override": k8s.V1Pod(
               spec=k8s.V1PodSpec(
                   containers=[
                      k8s.V1Container(
                         name="base",
                         image="ghcr.io/navikt/mitt-airflow-image:v1"
                      )
                   ]
               )
           )
        },
        dag=dag
    )

Overstyring av dataverk-airflow image

Skal man bygge eget image for å overstyre dataverk-airflow sitt standard image anbefaler vi å ta utgangspunkt i et av våre dataverk-airflow imager for den ønskede python versjonen. Dette imaget vil bygge på det offisielle python imaget samt inneholde drivere for oracle, postgres, i tillegg til quarto.

Dersom du bygger eget image og ønsker å bruke quarto_operator fra dataverk-airflow så har dette biblioteket en avhengighet til kommandolinjeverktøyet knatch og må derfor også installeres i ditt image.

Under følger et eksempel på hvordan å overstyre imaget som dataverk-airflow containeren bruker med imaget du selv har bygget:

from airflow import DAG
from airflow.utils.dates import days_ago
from dataverk_airflow import notebook_operator

with DAG('dag', start_date=days_ago(1), schedule_interval=None) as dag:
    task = notebook_operator(dag=dag,
                             name="knada-pod-operator",
                             repo="navikt/repo",
                             nb_path="notebooks/mynb.ipynb",
                             image="ghcr.io/navikt/mitt-airflow-image:v1")

API-tilgang til Airflow

Har man behov for at en ekstern tjeneste skal snakke med API-et til Airflow trenger Nada å konfigurere noe på "baksiden" og lage en service bruker for dere. Ta kontakt i Slack#nada, så fikser vi dette for dere. Vi vil ta opprette en service account for deres Airflow, og lage en ekstern adresse (som tilgangsstyres med Cloud Armor).

Et typisk scenario for dette er å la IWS styre jobber i Airflow. Akkurat dette scenarioet er også dokumentert i Confluence/Analytisk Plattform.

Audit logs av tasks

Som et risikoreduserendetiltak logger vi hvem som kjører hvilke jobber i KNADA ned til Datavarehus. dette er for at Datavarehus skal ha bedre kontroll på hvem som snakker med de. Selve tjenesten heter knaudit, og det er kun Datavarehus og NADA som har tilgang til disse loggene.

Eksempel på hva vi logger:

{"commit_sha1":["d19dcf695f043c6eff6b0cc2478b58d45299ca97"],"hostname":["mycsvdag-starting-fc8dfe28afae414da33a5d2a57db85d1"],"run_id":["scheduled__2023-05-03T05:30:00+00:00"],"timestamp":["2023-05-03T05:35:11.000Z"],"git_repo":["github.com/navikt/test-team-dag"],"ip":["321.312.312.321"],"namespace":["team-test-ncnv"],"task_id":["starting"],"git_branch":["main"],"dag_id":["MyCSVDAG"],"triggered_by":["airflow"]}

Hvis det er en manuell kjøring så vil triggered_by være satt til NAV-ident for den innlogget Airflow-bruker.