Hari ini aku belajar Airflow lagi, Airflow 3. Versi kali ini membawa beberapa perubahan.
Mendefinisikan DAG
DAG Decorator
Cara ini cenderung lebih sederhana dan to the point. Ini best practice. Syntax-nya,
from airflow.sdk import dag
from pendulum import datetime
@dag(
schedule="@daily",
start_date=datetime(2025, 1, 1),
description="This dags does ...",
tags=["team_a", "team_b"],
max_consecutive_failed_dag_runs=3
)
def my_dag():
# define task here
pass
my_dag()
DAG Object
from airflow.sdk import task, DAG
from pendulum import datetime
with DAG(
schedule="@daily",
start_date=datetime(2025, 1, 1),
description=" This dags does..."
):
@task
def task_a():
pass
task_a()
Passing Object to Variable
Cara ini tidak direkomendasikan. Cara ini sudah terlalu tua.
D = DAG(
schedule="@daily",
start_date=datetime(2025, 1, 1),
description=" This dags does..."
)
@task(dag=d)
def my_dag():
pass
Python Operator dan Bash Operator
Ada dua pendekatan yang bisa digunakan, yaitu pendekatan PythonOperator
dan @task
.
PythonOperator
Cara ini membutuhkan lebih banyak effort dan lebih susah untuk dibaca.
from airflow.providers.standard.operators.python import PythonOperator
def _task_a(): # biasanya dibuat di atas definisi DAG
print("Hello from task A")
@dag(...)
def my_dag():
task_a = PythonOperator(
task_id="task_a", # samakan dengan nama var untuk best practice
python_callable=_task_a
)
my_dag()
@task
Cara ini lagi-lagi to the point. Kita tidak perlu membuat fungsi terpisah dan tidak perlu import module lain. Jadi, sebenarnya decorator @task
setara dengan PythonOperator.
@dag()
def my_dag():
@task
def task_a():
print("Hello from task A")
task_a()
my_dag()
@task.bash
Cara ini memungkinkan untuk melakukan eksekusi syntax di terminal. Contohnya,
@dag()
def my_dag():
@task.bash
def task_create_file():
return 'echo "Hi there!" >/tmp/dummy'
task_create_file()
my_dag()
Default Arguments
Bayangkan jika kita harus menulis kode yang redundan seperti berikut,
@dag('my_dag', start_date=datetime(2025, 1 , 1),
description='A simple tutorial DAG', tags=['data_science'],
schedule='@daily')
def my_dag():
task_a = PythonOperator(task_id='task_a', python_callable=print_a, retries=3)
task_b = PythonOperator(task_id='task_b', python_callable=print_b, retries=3)
task_c = PythonOperator(task_id='task_c', python_callable=print_c, retries=3)
task_d = PythonOperator(task_id='task_d', python_callable=print_d, retries=3)
task_e = PythonOperator(task_id='task_e', python_callable=print_e, retries=3)
chain(task_a, [task_b, task_c], [task_d, task_e])
Kita bisa menggunakan pendekatan yang lebih ciamik, yaitu menggunakan default_args
default_args = {
'retries': 3,
}
@dag('my_dag', start_date=datetime(2025, 1 , 1), default_args=default_args,
description='A simple tutorial DAG', tags=['data_science'],
schedule='@daily')
def my_dag():
task_a = PythonOperator(task_id='task_a', python_callable=print_a)
task_b = PythonOperator(task_id='task_b', python_callable=print_b)
task_c = PythonOperator(task_id='task_c', python_callable=print_c)
Task Dependencies
Normal
Gunakan right shift operator
, menjadi task_a() >> task_b() >> task_c() >> task_d()
graph LR task_a --> task_b --> task_c --> task_d
Bercabang Jenis Pertama
Jika ingin suatu task dikerjakan secara paralel, bisa menggunakan task_a() >> task_b() >> [task_c(), task_d()]
graph LR task_a --> task_b --> task_c task_b --> task_e
Bercabang Jenis Kedua
Dapat menggunakan pendekatan berikut,
a = task_a()
a >> task_b() >> task_c()
a >> task_d() >> task_e()
Nantinya akan menghasilkan seperti pada gambar
graph LR task_a --> task_b --> task_c task_a --> task_d --> task_e
Bisa juga menggunakan chain()
sehingga lebih sederhana, caranya,
from airflow.sdk import chain
chain(task_a() >> [task_b(), task_d()], [task_c(), task_e()])