DAGs Scheduling
Key Concepts
Beberapa konsep yang mungkin ditemui:
-
start_date
→ Kapan DAG pertama akan dijalankan -
schedule
→ seberapa sering dijalankan, misalnya @daily, @monthly, @once, dll. -
catchup
→ menjalakan DAG yang ketinggalan (misalnya DAG di pause). Jadi DAG akan dijalankan antara start_date dan terakhir kali DAG di-trigger. -
backfill
→ menjalakan DAG pada tanggal yang lain atau menjalankan ulang
State pada DAG
Normalnya, pada satu waktu, hanya 6 DAGs yang bisa berjalan secara bersamaan.
Ketika DAG dijalankan, berikut adalah state yang terjadi. Jika satu saja task gagal, state DAG akan menjadi failure. Setiap DAG yang dijalankan akan memiliki beberapa elemen, misalnya state, dag id, logical date, start date, end date, duration, run id.
graph LR Queued --> Running --> Success Running --> Failure
Data Interval pada Airflow 3
Pada Airflow 3, ada pendekatan baru yang digunakan terkait data_intervals
. Jadi, secara bawaan akan menggunakan nilai False
. Sehingga, setiap DAG yang berjalan tidak membutuhkan interval waktu.
Jika data_intervals
kita aktifkan, maka ketika DAG ingin berjalan, maka menunggu waktu interval_end
terlebih dahulu.
Decorator Schedule
Best practice-nya gunakandecorator
ataupendulum.duration
Ada masalah yang mungkin timbul ketika menggunakan decorator schedule, yaitu inkonsistensi. Misalkan kita menjadwalkan DAG setiap tiga hari, namun melintasi bulan. Maka, hal seperti di bawah bisa terjadi.
Untuk mengatasi hal ini, bisa menggunakan pendekatan duration
from pendulum import duration
@dag(..., schedule=duration(days=3)
def my_dag():
...
XComs
Konsep Dasar
Salah satu cara untuk transfer data antar task adalah menggunakan XCom. XCom menggunakan konsep push and pull data menggunakan key dan value. Jadi, setiap kita melakukan push, ada beberapa metadata yang menempel, yaitu key, value, run_id, task_id, dag_id
. Ketika ingin melakukan pull, kita hanya membutuhkan key dan task_id
.
graph LR A ---|push| my_val B ---|pull| my_val
Implementasi XCom
Metode Eksplisit
from airflow.sdk import Context
@dag
def xcom_dag():
@task
def task_a(**context: Context):
val = 42
context['ti'].xcom_push(key='my_key', value=val)
@task
def task_b(**context: Context):
context['ti'].xcom_pull(task_ids='task_a', key='my_key')
task_a() >> task_b()
xcom_dag()
Lebih Sederhana
...
@task
def task_a():
val = 42
return val # jadi, pendekatan ini setara dengan sebelumnya
@task
def task_b(val: int): # definisikan arg/param-nya, sesuaikan
print(val)
val = task_a()
task_b(val)
...
Menggunakan ti-context
...
@task
def task_a(ti):
val = 42
ti.xcom_push(key='my_key', value=val)
@task
def task_b(ti):
ti.xcom_pull(task_ids='task_a', key='my_key')
...
Multiple Pull
...
@task
def task_b(ti):
vals = ti.xcom_pull(task_ids=['task_a', 'task_b'], key='my_key')
print(vals)
...
Limitasi
XCOMs have limitations based on the database used, with SQLite allowing up to 2GB, Postgres up to 1GB, and MySQL up to 64MB.
Debug Airflow
DAGs Don’t Appear
Pastikan DAG terletak pada dags
folder. Pastikan juga nama file DAG tidak termasuk pada .airflowignore
. Pastikan dag_id
unik, jika tidak unik Airflow akan mengambil secara random.
Dependency Conflicts
Jika suatu task membutuhkan dependencies yang berbeda, bisa digunakan beberapa alternatif lain yang dapat memisahkan environments, misalnya KubernetesPodOperator
, ExternalPythonOperator
, PythonVirtualEnvOperator
.
Improper DAG Behaviour
start_date
bukan di masa depan. Pastikan juga end_date
tidak di masa lalu.