skip to content
Site header image Dimas Wahyu Saputro

DAG 101 — Airflow 3

Catatan mengenai DAG, Task, dan perintilannya pada Airflow 3

Last Updated:

Hari ini aku belajar Airflow lagi, Airflow 3. Versi kali ini membawa beberapa perubahan.

Mendefinisikan DAG

DAG Decorator

Cara ini cenderung lebih sederhana dan to the point. Ini best practice. Syntax-nya,

from airflow.sdk import dag
from pendulum import datetime 

@dag(
    schedule="@daily",
		start_date=datetime(2025, 1, 1),
		description="This dags does ...",
		tags=["team_a", "team_b"],
		max_consecutive_failed_dag_runs=3
)
def my_dag():
		# define task here
		pass

my_dag()

DAG Object

from airflow.sdk import task, DAG
from pendulum import datetime

with DAG(
		schedule="@daily",
		start_date=datetime(2025, 1, 1),
		description=" This dags does..."
):
		@task
		def task_a():
				pass

		task_a()

Passing Object to Variable

Cara ini tidak direkomendasikan. Cara ini sudah terlalu tua.

D = DAG(
		schedule="@daily",
		start_date=datetime(2025, 1, 1),
		description=" This dags does..."
)

@task(dag=d)
def my_dag():
		pass

Python Operator dan Bash Operator

Ada dua pendekatan yang bisa digunakan, yaitu pendekatan PythonOperator dan @task.

PythonOperator

Cara ini membutuhkan lebih banyak effort dan lebih susah untuk dibaca.

from airflow.providers.standard.operators.python import PythonOperator

def _task_a(): # biasanya dibuat di atas definisi DAG
		print("Hello from task A")

@dag(...)
def my_dag():
		
		task_a = PythonOperator(
				task_id="task_a", # samakan dengan nama var untuk best practice
				python_callable=_task_a
		)

my_dag()

@task

Cara ini lagi-lagi to the point. Kita tidak perlu membuat fungsi terpisah dan tidak perlu import module lain. Jadi, sebenarnya decorator @task setara dengan PythonOperator.

@dag()
def my_dag():

		@task
		def task_a(): 
				print("Hello from task A")

		task_a()

my_dag()

@task.bash

Cara ini memungkinkan untuk melakukan eksekusi syntax di terminal. Contohnya,

@dag()
def my_dag():

		@task.bash
		def task_create_file(): 
				return 'echo "Hi there!" >/tmp/dummy'

		task_create_file()

my_dag()

Default Arguments

Bayangkan jika kita harus menulis kode yang redundan seperti berikut,

@dag('my_dag', start_date=datetime(2025, 1 , 1),
         description='A simple tutorial DAG', tags=['data_science'],
         schedule='@daily')
def my_dag():
    
    task_a = PythonOperator(task_id='task_a', python_callable=print_a, retries=3)
    task_b = PythonOperator(task_id='task_b', python_callable=print_b, retries=3)
    task_c = PythonOperator(task_id='task_c', python_callable=print_c, retries=3)
    task_d = PythonOperator(task_id='task_d', python_callable=print_d, retries=3)
    task_e = PythonOperator(task_id='task_e', python_callable=print_e, retries=3)

    chain(task_a, [task_b, task_c], [task_d, task_e])

Kita bisa menggunakan pendekatan yang lebih ciamik, yaitu menggunakan default_args

default_args = {
    'retries': 3,
}

@dag('my_dag', start_date=datetime(2025, 1 , 1), default_args=default_args,
         description='A simple tutorial DAG', tags=['data_science'],
         schedule='@daily')
def my_dag():
    
    task_a = PythonOperator(task_id='task_a', python_callable=print_a)
    task_b = PythonOperator(task_id='task_b', python_callable=print_b)
    task_c = PythonOperator(task_id='task_c', python_callable=print_c)

Task Dependencies

Normal

Gunakan right shift operator, menjadi task_a() >> task_b() >> task_c() >> task_d()

graph LR
  task_a --> task_b --> task_c --> task_d

Bercabang Jenis Pertama

Jika ingin suatu task dikerjakan secara paralel, bisa menggunakan task_a() >> task_b() >> [task_c(), task_d()]

graph LR
  task_a --> task_b --> task_c
	task_b --> task_e

Bercabang Jenis Kedua

Dapat menggunakan pendekatan berikut,

a = task_a() 
a >> task_b() >> task_c()
a >> task_d() >> task_e()

Nantinya akan menghasilkan seperti pada gambar

graph LR
  task_a --> task_b --> task_c
	task_a --> task_d --> task_e

Bisa juga menggunakan chain() sehingga lebih sederhana, caranya,

from airflow.sdk import chain

chain(task_a() >> [task_b(), task_d()], [task_c(), task_e()])