skip to content
Site header image Dimas Wahyu Saputro

Scheduling, XComs, Debug — Airflow 3

Masih pada series Airflow 3, kali ini fokus pada Scheduling, kirim data antar tasks, dan bagaimana cara mengatasi ketika ada error atau hal yang tidak diinginkan.


DAGs Scheduling

Key Concepts

Beberapa konsep yang mungkin ditemui:

  1. start_date → Kapan DAG pertama akan dijalankan
  2. schedule → seberapa sering dijalankan, misalnya @daily, @monthly, @once, dll.
  3. catchup → menjalakan DAG yang ketinggalan (misalnya DAG di pause). Jadi DAG akan dijalankan antara start_date dan terakhir kali DAG di-trigger.
  4. backfill → menjalakan DAG pada tanggal yang lain atau menjalankan ulang

State pada DAG

Normalnya, pada satu waktu, hanya 6 DAGs yang bisa berjalan secara bersamaan.

Ketika DAG dijalankan, berikut adalah state yang terjadi. Jika satu saja task gagal, state DAG akan menjadi failure. Setiap DAG yang dijalankan akan memiliki beberapa elemen, misalnya state, dag id, logical date, start date, end date, duration, run id.

graph LR
  Queued --> Running --> Success
  Running --> Failure

Data Interval pada Airflow 3

Pada Airflow 3, ada pendekatan baru yang digunakan terkait data_intervals. Jadi, secara bawaan akan menggunakan nilai False. Sehingga, setiap DAG yang berjalan tidak membutuhkan interval waktu.

Jika data_intervals kita aktifkan, maka ketika DAG ingin berjalan, maka menunggu waktu interval_end terlebih dahulu.

Decorator Schedule

Best practice-nya gunakan decorator atau pendulum.duration

Ada masalah yang mungkin timbul ketika menggunakan decorator schedule, yaitu inkonsistensi. Misalkan kita menjadwalkan DAG setiap tiga hari, namun melintasi bulan. Maka, hal seperti di bawah bisa terjadi.

Untuk mengatasi hal ini, bisa menggunakan pendekatan duration

from pendulum import duration

@dag(..., schedule=duration(days=3)
def my_dag():
		...

XComs

Konsep Dasar

⚠️
Cocok untuk data kecil! Jangan terlalu sering melakukan push dan pull, karena datanya disimpan pada Airflow Meta Database. Lebih baik gabungkan jadi satu menjadi dict/json.

Salah satu cara untuk transfer data antar task adalah menggunakan XCom. XCom menggunakan konsep push and pull data menggunakan key dan value. Jadi, setiap kita melakukan push, ada beberapa metadata yang menempel, yaitu key, value, run_id, task_id, dag_id. Ketika ingin melakukan pull, kita hanya membutuhkan key dan task_id.

graph LR
  A ---|push| my_val
  B ---|pull| my_val

Implementasi XCom

Metode Eksplisit

from airflow.sdk import Context

@dag
def xcom_dag():

		@task
		def task_a(**context: Context):
				val = 42
				context['ti'].xcom_push(key='my_key', value=val)


		@task
		def task_b(**context: Context):
				context['ti'].xcom_pull(task_ids='task_a', key='my_key')
				
		
		task_a() >> task_b()
		
xcom_dag()

Lebih Sederhana

...				
		@task
		def task_a():
				val = 42
				return val # jadi, pendekatan ini setara dengan sebelumnya


		@task
		def task_b(val: int): # definisikan arg/param-nya, sesuaikan
				print(val)
		
		
		val = task_a()
		task_b(val)
		
...

Menggunakan ti-context

...
		@task
		def task_a(ti):
				val = 42
				ti.xcom_push(key='my_key', value=val)


		@task
		def task_b(ti):
				ti.xcom_pull(task_ids='task_a', key='my_key')
...

Multiple Pull

...
		@task
		def task_b(ti):
				vals = ti.xcom_pull(task_ids=['task_a', 'task_b'], key='my_key')
				print(vals)
...

Limitasi

XCOMs have limitations based on the database used, with SQLite allowing up to 2GB, Postgres up to 1GB, and MySQL up to 64MB.

Debug Airflow

DAGs Don’t Appear

⚠️
DAG biasanya akan diparser setiap 5 menit, untuk update files DAG sekitar 30 detik.

Pastikan DAG terletak pada dags folder. Pastikan juga nama file DAG tidak termasuk pada .airflowignore. Pastikan dag_id unik, jika tidak unik Airflow akan mengambil secara random.

Dependency Conflicts

Jika suatu task membutuhkan dependencies yang berbeda, bisa digunakan beberapa alternatif lain yang dapat memisahkan environments, misalnya KubernetesPodOperator, ExternalPythonOperator, PythonVirtualEnvOperator.

Improper DAG Behaviour

⚠️
Pastikan start_date bukan di masa depan. Pastikan juga end_date tidak di masa lalu.