Airflow DAG Design

Apache Airflow DAG Directed Acyclic Graph Workflow Orchestration Low-Code No-Code Pipeline Tasks Dependencies Schedule Operator Python YAML Config Template

ApproachCode LevelเหมาะกับFlexibility
Python DAGFull CodeDeveloperสูงสุด
Dynamic DAG (YAML)Low CodeData Engineerสูง
Template DAGLow CodeData Analystปานกลาง
UI BuilderNo CodeBusiness Userต่ำ

DAG Patterns

# === Airflow DAG Patterns ===

# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.operators.bash import BashOperator
# from airflow.utils.dates import days_ago
# from datetime import timedelta
#
# # Pattern 1: Sequential Pipeline
# with DAG(
# "etl_sequential",
# schedule_interval="0 2 * * *", # ทุกวัน 02:00
# start_date=days_ago(1),
# default_args={
# "retries": 3,
# "retry_delay": timedelta(minutes=5),
# "execution_timeout": timedelta(hours=1),
# },
# catchup=False,
# tags=["etl", "production"],
# ) as dag:
# extract = PythonOperator(
# task_id="extract",
# python_callable=extract_data,
# )
# transform = PythonOperator(
# task_id="transform",
# python_callable=transform_data,
# )
# load = PythonOperator(
# task_id="load",
# python_callable=load_data,
# )
# notify = BashOperator(
# task_id="notify",
# bash_command='echo "Pipeline complete"',
# )
# extract >> transform >> load >> notify
#
# # Pattern 2: Fan-out / Fan-in
# with DAG("parallel_processing", ...):
# start = PythonOperator(task_id="start", ...)
# process_a = PythonOperator(task_id="process_a", ...)
# process_b = PythonOperator(task_id="process_b", ...)
# process_c = PythonOperator(task_id="process_c", ...)
# merge = PythonOperator(task_id="merge", ...)
# start >> [process_a, process_b, process_c] >> merge
#
# # Pattern 3: Conditional Branching
# from airflow.operators.python import BranchPythonOperator
# def choose_branch(**ctx):
# if ctx["params"]["data_size"] > 1000000:
# return "heavy_processing"
# return "light_processing"
#
# branch = BranchPythonOperator(
# task_id="branch",
# python_callable=choose_branch,
# )

from dataclasses import dataclass
from typing import List

@dataclass
class DAGConfig:
 name: str
 pattern: str
 tasks: int
 schedule: str
 avg_duration: str
 status: str

dags = [
 DAGConfig("etl_orders", "Sequential", 4, "Every hour", "5 min", "Running"),
 DAGConfig("ml_training", "Fan-out", 8, "Daily 03:00", "2 hours", "Success"),
 DAGConfig("report_gen", "Conditional", 6, "Daily 06:00", "15 min", "Success"),
 DAGConfig("data_quality", "Sequential", 3, "Every 30 min", "2 min", "Running"),
 DAGConfig("backfill_hist", "Fan-out", 12, "Manual", "4 hours", "Paused"),
]

print("=== DAG Dashboard ===")
for d in dags:
 print(f" [{d.status}] {d.name} ({d.pattern})")
 print(f" Tasks: {d.tasks} | Schedule: {d.schedule} | Duration: {d.avg_duration}")

Dynamic DAG Generation

# === Low-Code: Dynamic DAG from YAML ===

# dag_configs/etl_sales.yaml
# dag_id: etl_sales
# schedule: "0 * * * *"
# description: "Sales data ETL pipeline"
# tags: ["etl", "sales"]
# tasks:
# - id: extract_mysql
# type: python
# callable: extractors.mysql_extract
# params: {table: "orders", since: "yesterday"}
# - id: extract_api
# type: python
# callable: extractors.api_extract
# params: {endpoint: "/api/sales"}
# - id: transform
# type: python
# callable: transformers.clean_sales
# depends_on: [extract_mysql, extract_api]
# - id: load_warehouse
# type: python
# callable: loaders.load_bigquery
# params: {dataset: "analytics", table: "sales"}
# depends_on: [transform]

# dag_factory.py — Generate DAGs from YAML
# import yaml
# import glob
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from datetime import datetime
#
# def create_dag_from_config(config_path):
# with open(config_path) as f:
# config = yaml.safe_load(f)
#
# dag = DAG(
# dag_id=config["dag_id"],
# schedule_interval=config.get("schedule", None),
# description=config.get("description", ""),
# start_date=datetime(2024, 1, 1),
# catchup=False,
# tags=config.get("tags", []),
# )
#
# tasks = {}
# for task_cfg in config["tasks"]:
# callable_fn = import_callable(task_cfg["callable"])
# tasks[task_cfg["id"]] = PythonOperator(
# task_id=task_cfg["id"],
# python_callable=callable_fn,
# op_kwargs=task_cfg.get("params", {}),
# dag=dag,
# )
#
# # Set dependencies
# for task_cfg in config["tasks"]:
# for dep in task_cfg.get("depends_on", []):
# tasks[dep] >> tasks[task_cfg["id"]]
#
# return dag
#
# # Auto-discover and create DAGs
# for config_file in glob.glob("/opt/airflow/dag_configs/*.yaml"):
# dag = create_dag_from_config(config_file)
# globals()[dag.dag_id] = dag

import yaml

sample_config = {
 "dag_id": "etl_sales",
 "schedule": "0 * * * *",
 "tags": ["etl", "sales"],
 "tasks": [
 {"id": "extract", "type": "python", "callable": "extract_sales"},
 {"id": "transform", "type": "python", "depends_on": ["extract"]},
 {"id": "load", "type": "python", "depends_on": ["transform"]},
 ],
}

print("\nDynamic DAG Config (YAML):")
print(yaml.dump(sample_config, default_flow_style=False, allow_unicode=True))

Testing และ Monitoring

# === DAG Testing & Monitoring ===

# Unit Test DAG
# import pytest
# from airflow.models import DagBag
#
# def test_dag_loaded():
# dagbag = DagBag()
# assert "etl_sales" in dagbag.dags
# assert dagbag.import_errors == {}
#
# def test_dag_structure():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# assert len(dag.tasks) == 4
# assert dag.schedule_interval == "0 * * * *"
#
# def test_task_dependencies():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# extract = dag.get_task("extract")
# transform = dag.get_task("transform")
# assert transform in extract.downstream_list

# CLI Commands
# airflow dags list
# airflow dags test etl_sales 2024-03-01
# airflow tasks test etl_sales extract 2024-03-01
# airflow dags trigger etl_sales

best_practices = {
 "Idempotent": "รันซ้ำได้ผลเหมือนเดิม ใช้ UPSERT แทน INSERT",
 "Atomic": "แต่ละ Task ทำงานเดียว แยกย่อยไม่รวมกัน",
 "Small DAGs": "แยก DAG ย่อย ไม่รวมทุกอย่างใน DAG เดียว",
 "Retry": "ตั้ง retries=3, retry_delay=5min ทุก Task",
 "Timeout": "ตั้ง execution_timeout ป้องกัน Task ค้าง",
 "Alerts": "on_failure_callback แจ้ง Slack/Email เมื่อ Fail",
 "XCom": "ส่งข้อมูลเล็กๆระหว่าง Task ไม่ส่ง DataFrame ใหญ่",
 "Variables": "ใช้ Airflow Variables แทน Hardcode Config",
 "Connections": "ใช้ Airflow Connections เก็บ Credentials",
}

print("DAG Best Practices:")
for practice, desc in best_practices.items():
 print(f" [{practice}]: {desc}")

# Monitoring Metrics
metrics = {
 "DAG Success Rate": "98.5%",
 "Avg Task Duration": "3.2 min",
 "Failed Tasks (24h)": "3",
 "Active DAGs": "15",
 "Queued Tasks": "2",
 "Scheduler Heartbeat": "OK",
}

print(f"\n\nMonitoring:")
for k, v in metrics.items():
 print(f" {k}: {v}")

เคล็ดลับ

  • YAML DAGs: ใช้ YAML Config สร้าง DAG อัตโนมัติ ลด Code ซ้ำ
  • Template: สร้าง Template DAG ให้ทีมใช้ แก้แค่ Config
  • Test: ทดสอบ DAG ก่อน Deploy ด้วย pytest
  • Idempotent: ทุก Task ต้อง Idempotent รันซ้ำได้
  • Monitor: ตั้ง Alert ทุก DAG แจ้งเมื่อ Fail

Apache Airflow คืออะไร

Open Source Workflow Orchestration Python DAG Task Dependencies Schedule Web UI Retry Operator Data Pipeline