Airflow DAG Design
Apache Airflow DAG Directed Acyclic Graph Workflow Orchestration Low-Code No-Code Pipeline Tasks Dependencies Schedule Operator Python YAML Config Template
| Approach | Code Level | เหมาะกับ | Flexibility |
|---|---|---|---|
| Python DAG | Full Code | Developer | สูงสุด |
| Dynamic DAG (YAML) | Low Code | Data Engineer | สูง |
| Template DAG | Low Code | Data Analyst | ปานกลาง |
| UI Builder | No Code | Business User | ต่ำ |
DAG Patterns
# === Airflow DAG Patterns ===
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.operators.bash import BashOperator
# from airflow.utils.dates import days_ago
# from datetime import timedelta
#
# # Pattern 1: Sequential Pipeline
# with DAG(
# "etl_sequential",
# schedule_interval="0 2 * * *", # ทุกวัน 02:00
# start_date=days_ago(1),
# default_args={
# "retries": 3,
# "retry_delay": timedelta(minutes=5),
# "execution_timeout": timedelta(hours=1),
# },
# catchup=False,
# tags=["etl", "production"],
# ) as dag:
# extract = PythonOperator(
# task_id="extract",
# python_callable=extract_data,
# )
# transform = PythonOperator(
# task_id="transform",
# python_callable=transform_data,
# )
# load = PythonOperator(
# task_id="load",
# python_callable=load_data,
# )
# notify = BashOperator(
# task_id="notify",
# bash_command='echo "Pipeline complete"',
# )
# extract >> transform >> load >> notify
#
# # Pattern 2: Fan-out / Fan-in
# with DAG("parallel_processing", ...):
# start = PythonOperator(task_id="start", ...)
# process_a = PythonOperator(task_id="process_a", ...)
# process_b = PythonOperator(task_id="process_b", ...)
# process_c = PythonOperator(task_id="process_c", ...)
# merge = PythonOperator(task_id="merge", ...)
# start >> [process_a, process_b, process_c] >> merge
#
# # Pattern 3: Conditional Branching
# from airflow.operators.python import BranchPythonOperator
# def choose_branch(**ctx):
# if ctx["params"]["data_size"] > 1000000:
# return "heavy_processing"
# return "light_processing"
#
# branch = BranchPythonOperator(
# task_id="branch",
# python_callable=choose_branch,
# )
from dataclasses import dataclass
from typing import List
@dataclass
class DAGConfig:
name: str
pattern: str
tasks: int
schedule: str
avg_duration: str
status: str
dags = [
DAGConfig("etl_orders", "Sequential", 4, "Every hour", "5 min", "Running"),
DAGConfig("ml_training", "Fan-out", 8, "Daily 03:00", "2 hours", "Success"),
DAGConfig("report_gen", "Conditional", 6, "Daily 06:00", "15 min", "Success"),
DAGConfig("data_quality", "Sequential", 3, "Every 30 min", "2 min", "Running"),
DAGConfig("backfill_hist", "Fan-out", 12, "Manual", "4 hours", "Paused"),
]
print("=== DAG Dashboard ===")
for d in dags:
print(f" [{d.status}] {d.name} ({d.pattern})")
print(f" Tasks: {d.tasks} | Schedule: {d.schedule} | Duration: {d.avg_duration}")
Dynamic DAG Generation
# === Low-Code: Dynamic DAG from YAML ===
# dag_configs/etl_sales.yaml
# dag_id: etl_sales
# schedule: "0 * * * *"
# description: "Sales data ETL pipeline"
# tags: ["etl", "sales"]
# tasks:
# - id: extract_mysql
# type: python
# callable: extractors.mysql_extract
# params: {table: "orders", since: "yesterday"}
# - id: extract_api
# type: python
# callable: extractors.api_extract
# params: {endpoint: "/api/sales"}
# - id: transform
# type: python
# callable: transformers.clean_sales
# depends_on: [extract_mysql, extract_api]
# - id: load_warehouse
# type: python
# callable: loaders.load_bigquery
# params: {dataset: "analytics", table: "sales"}
# depends_on: [transform]
# dag_factory.py — Generate DAGs from YAML
# import yaml
# import glob
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from datetime import datetime
#
# def create_dag_from_config(config_path):
# with open(config_path) as f:
# config = yaml.safe_load(f)
#
# dag = DAG(
# dag_id=config["dag_id"],
# schedule_interval=config.get("schedule", None),
# description=config.get("description", ""),
# start_date=datetime(2024, 1, 1),
# catchup=False,
# tags=config.get("tags", []),
# )
#
# tasks = {}
# for task_cfg in config["tasks"]:
# callable_fn = import_callable(task_cfg["callable"])
# tasks[task_cfg["id"]] = PythonOperator(
# task_id=task_cfg["id"],
# python_callable=callable_fn,
# op_kwargs=task_cfg.get("params", {}),
# dag=dag,
# )
#
# # Set dependencies
# for task_cfg in config["tasks"]:
# for dep in task_cfg.get("depends_on", []):
# tasks[dep] >> tasks[task_cfg["id"]]
#
# return dag
#
# # Auto-discover and create DAGs
# for config_file in glob.glob("/opt/airflow/dag_configs/*.yaml"):
# dag = create_dag_from_config(config_file)
# globals()[dag.dag_id] = dag
import yaml
sample_config = {
"dag_id": "etl_sales",
"schedule": "0 * * * *",
"tags": ["etl", "sales"],
"tasks": [
{"id": "extract", "type": "python", "callable": "extract_sales"},
{"id": "transform", "type": "python", "depends_on": ["extract"]},
{"id": "load", "type": "python", "depends_on": ["transform"]},
],
}
print("\nDynamic DAG Config (YAML):")
print(yaml.dump(sample_config, default_flow_style=False, allow_unicode=True))
Testing และ Monitoring
# === DAG Testing & Monitoring ===
# Unit Test DAG
# import pytest
# from airflow.models import DagBag
#
# def test_dag_loaded():
# dagbag = DagBag()
# assert "etl_sales" in dagbag.dags
# assert dagbag.import_errors == {}
#
# def test_dag_structure():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# assert len(dag.tasks) == 4
# assert dag.schedule_interval == "0 * * * *"
#
# def test_task_dependencies():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# extract = dag.get_task("extract")
# transform = dag.get_task("transform")
# assert transform in extract.downstream_list
# CLI Commands
# airflow dags list
# airflow dags test etl_sales 2024-03-01
# airflow tasks test etl_sales extract 2024-03-01
# airflow dags trigger etl_sales
best_practices = {
"Idempotent": "รันซ้ำได้ผลเหมือนเดิม ใช้ UPSERT แทน INSERT",
"Atomic": "แต่ละ Task ทำงานเดียว แยกย่อยไม่รวมกัน",
"Small DAGs": "แยก DAG ย่อย ไม่รวมทุกอย่างใน DAG เดียว",
"Retry": "ตั้ง retries=3, retry_delay=5min ทุก Task",
"Timeout": "ตั้ง execution_timeout ป้องกัน Task ค้าง",
"Alerts": "on_failure_callback แจ้ง Slack/Email เมื่อ Fail",
"XCom": "ส่งข้อมูลเล็กๆระหว่าง Task ไม่ส่ง DataFrame ใหญ่",
"Variables": "ใช้ Airflow Variables แทน Hardcode Config",
"Connections": "ใช้ Airflow Connections เก็บ Credentials",
}
print("DAG Best Practices:")
for practice, desc in best_practices.items():
print(f" [{practice}]: {desc}")
# Monitoring Metrics
metrics = {
"DAG Success Rate": "98.5%",
"Avg Task Duration": "3.2 min",
"Failed Tasks (24h)": "3",
"Active DAGs": "15",
"Queued Tasks": "2",
"Scheduler Heartbeat": "OK",
}
print(f"\n\nMonitoring:")
for k, v in metrics.items():
print(f" {k}: {v}")
เคล็ดลับ
- YAML DAGs: ใช้ YAML Config สร้าง DAG อัตโนมัติ ลด Code ซ้ำ
- Template: สร้าง Template DAG ให้ทีมใช้ แก้แค่ Config
- Test: ทดสอบ DAG ก่อน Deploy ด้วย pytest
- Idempotent: ทุก Task ต้อง Idempotent รันซ้ำได้
- Monitor: ตั้ง Alert ทุก DAG แจ้งเมื่อ Fail
Apache Airflow คืออะไร
Open Source Workflow Orchestration Python DAG Task Dependencies Schedule Web UI Retry Operator Data Pipeline