Airflow DAG Design
Apache Airflow DAG Directed Acyclic Graph Workflow Orchestration Low-Code No-Code Pipeline Tasks Dependencies Schedule Operator Python YAML Config Template
| Approach | Code Level | เหมาะกับ | Flexibility |
|---|---|---|---|
| Python DAG | Full Code | Developer | สูงสุด |
| Dynamic DAG (YAML) | Low Code | Data Engineer | สูง |
| Template DAG | Low Code | Data Analyst | ปานกลาง |
| UI Builder | No Code | Business User | ต่ำ |
DAG Patterns
# === Airflow DAG Patterns ===
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.operators.bash import BashOperator
# from airflow.utils.dates import days_ago
# from datetime import timedelta
#
# # Pattern 1: Sequential Pipeline
# with DAG(
# "etl_sequential",
# schedule_interval="0 2 * * *", # ทุกวัน 02:00
# start_date=days_ago(1),
# default_args={
# "retries": 3,
# "retry_delay": timedelta(minutes=5),
# "execution_timeout": timedelta(hours=1),
# },
# catchup=False,
# tags=["etl", "production"],
# ) as dag:
# extract = PythonOperator(
# task_id="extract",
# python_callable=extract_data,
# )
# transform = PythonOperator(
# task_id="transform",
# python_callable=transform_data,
# )
# load = PythonOperator(
# task_id="load",
# python_callable=load_data,
# )
# notify = BashOperator(
# task_id="notify",
# bash_command='echo "Pipeline complete"',
# )
# extract >> transform >> load >> notify
#
# # Pattern 2: Fan-out / Fan-in
# with DAG("parallel_processing", ...):
# start = PythonOperator(task_id="start", ...)
# process_a = PythonOperator(task_id="process_a", ...)
# process_b = PythonOperator(task_id="process_b", ...)
# process_c = PythonOperator(task_id="process_c", ...)
# merge = PythonOperator(task_id="merge", ...)
# start >> [process_a, process_b, process_c] >> merge
#
# # Pattern 3: Conditional Branching
# from airflow.operators.python import BranchPythonOperator
# def choose_branch(**ctx):
# if ctx["params"]["data_size"] > 1000000:
# return "heavy_processing"
# return "light_processing"
#
# branch = BranchPythonOperator(
# task_id="branch",
# python_callable=choose_branch,
# )
from dataclasses import dataclass
from typing import List
@dataclass
class DAGConfig:
name: str
pattern: str
tasks: int
schedule: str
avg_duration: str
status: str
dags = [
DAGConfig("etl_orders", "Sequential", 4, "Every hour", "5 min", "Running"),
DAGConfig("ml_training", "Fan-out", 8, "Daily 03:00", "2 hours", "Success"),
DAGConfig("report_gen", "Conditional", 6, "Daily 06:00", "15 min", "Success"),
DAGConfig("data_quality", "Sequential", 3, "Every 30 min", "2 min", "Running"),
DAGConfig("backfill_hist", "Fan-out", 12, "Manual", "4 hours", "Paused"),
]
print("=== DAG Dashboard ===")
for d in dags:
print(f" [{d.status}] {d.name} ({d.pattern})")
print(f" Tasks: {d.tasks} | Schedule: {d.schedule} | Duration: {d.avg_duration}")
Dynamic DAG Generation
# === Low-Code: Dynamic DAG from YAML ===
# dag_configs/etl_sales.yaml
# dag_id: etl_sales
# schedule: "0 * * * *"
# description: "Sales data ETL pipeline"
# tags: ["etl", "sales"]
# tasks:
# - id: extract_mysql
# type: python
# callable: extractors.mysql_extract
# params: {table: "orders", since: "yesterday"}
# - id: extract_api
# type: python
# callable: extractors.api_extract
# params: {endpoint: "/api/sales"}
# - id: transform
# type: python
# callable: transformers.clean_sales
# depends_on: [extract_mysql, extract_api]
# - id: load_warehouse
# type: python
# callable: loaders.load_bigquery
# params: {dataset: "analytics", table: "sales"}
# depends_on: [transform]
# dag_factory.py — Generate DAGs from YAML
# import yaml
# import glob
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from datetime import datetime
#
# def create_dag_from_config(config_path):
# with open(config_path) as f:
# config = yaml.safe_load(f)
#
# dag = DAG(
# dag_id=config["dag_id"],
# schedule_interval=config.get("schedule", None),
# description=config.get("description", ""),
# start_date=datetime(2024, 1, 1),
# catchup=False,
# tags=config.get("tags", []),
# )
#
# tasks = {}
# for task_cfg in config["tasks"]:
# callable_fn = import_callable(task_cfg["callable"])
# tasks[task_cfg["id"]] = PythonOperator(
# task_id=task_cfg["id"],
# python_callable=callable_fn,
# op_kwargs=task_cfg.get("params", {}),
# dag=dag,
# )
#
# # Set dependencies
# for task_cfg in config["tasks"]:
# for dep in task_cfg.get("depends_on", []):
# tasks[dep] >> tasks[task_cfg["id"]]
#
# return dag
#
# # Auto-discover and create DAGs
# for config_file in glob.glob("/opt/airflow/dag_configs/*.yaml"):
# dag = create_dag_from_config(config_file)
# globals()[dag.dag_id] = dag
import yaml
sample_config = {
"dag_id": "etl_sales",
"schedule": "0 * * * *",
"tags": ["etl", "sales"],
"tasks": [
{"id": "extract", "type": "python", "callable": "extract_sales"},
{"id": "transform", "type": "python", "depends_on": ["extract"]},
{"id": "load", "type": "python", "depends_on": ["transform"]},
],
}
print("\nDynamic DAG Config (YAML):")
print(yaml.dump(sample_config, default_flow_style=False, allow_unicode=True))
Testing และ Monitoring
# === DAG Testing & Monitoring ===
# Unit Test DAG
# import pytest
# from airflow.models import DagBag
#
# def test_dag_loaded():
# dagbag = DagBag()
# assert "etl_sales" in dagbag.dags
# assert dagbag.import_errors == {}
#
# def test_dag_structure():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# assert len(dag.tasks) == 4
# assert dag.schedule_interval == "0 * * * *"
#
# def test_task_dependencies():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# extract = dag.get_task("extract")
# transform = dag.get_task("transform")
# assert transform in extract.downstream_list
# CLI Commands
# airflow dags list
# airflow dags test etl_sales 2024-03-01
# airflow tasks test etl_sales extract 2024-03-01
# airflow dags trigger etl_sales
best_practices = {
"Idempotent": "รันซ้ำได้ผลเหมือนเดิม ใช้ UPSERT แทน INSERT",
"Atomic": "แต่ละ Task ทำงานเดียว แยกย่อยไม่รวมกัน",
"Small DAGs": "แยก DAG ย่อย ไม่รวมทุกอย่างใน DAG เดียว",
"Retry": "ตั้ง retries=3, retry_delay=5min ทุก Task",
"Timeout": "ตั้ง execution_timeout ป้องกัน Task ค้าง",
"Alerts": "on_failure_callback แจ้ง Slack/Email เมื่อ Fail",
"XCom": "ส่งข้อมูลเล็กๆระหว่าง Task ไม่ส่ง DataFrame ใหญ่",
"Variables": "ใช้ Airflow Variables แทน Hardcode Config",
"Connections": "ใช้ Airflow Connections เก็บ Credentials",
}
print("DAG Best Practices:")
for practice, desc in best_practices.items():
print(f" [{practice}]: {desc}")
# Monitoring Metrics
metrics = {
"DAG Success Rate": "98.5%",
"Avg Task Duration": "3.2 min",
"Failed Tasks (24h)": "3",
"Active DAGs": "15",
"Queued Tasks": "2",
"Scheduler Heartbeat": "OK",
}
print(f"\n\nMonitoring:")
for k, v in metrics.items():
print(f" {k}: {v}")
เคล็ดลับ
- YAML DAGs: ใช้ YAML Config สร้าง DAG อัตโนมัติ ลด Code ซ้ำ
- Template: สร้าง Template DAG ให้ทีมใช้ แก้แค่ Config
- Test: ทดสอบ DAG ก่อน Deploy ด้วย pytest
- Idempotent: ทุก Task ต้อง Idempotent รันซ้ำได้
- Monitor: ตั้ง Alert ทุก DAG แจ้งเมื่อ Fail
Apache Airflow คืออะไร
Open Source Workflow Orchestration Python DAG Task Dependencies Schedule Web UI Retry Operator Data Pipeline
DAG คืออะไร
Directed Acyclic Graph กราฟทิศทางไม่วนรอบ Workflow Tasks ลำดับ Extract Transform Load Operator Dependencies Schedule
Low-Code No-Code สำหรับ Airflow คืออะไร
ลด Code Dynamic DAG YAML JSON Config UI-based Template DAGs Data Analyst ไม่ถนัด Python Astronomer Cloud Composer
DAG Best Practices มีอะไรบ้าง
Idempotent Atomic Small DAGs Retry Timeout Alerts XCom Variables Connections Testing Documentation
สรุป
Apache Airflow DAG Design Low-Code No-Code YAML Dynamic DAG Template Sequential Fan-out Branching Best Practices Idempotent Retry Timeout Testing Monitoring Pipeline Orchestration
