Technology

Airflow DAG Design Low Code No Code

airflow dag design low code no code
Airflow DAG Design Low Code No Code | SiamCafe Blog
2026-04-26· อ. บอม — SiamCafe.net· 9,987 คำ

Airflow DAG Design

Apache Airflow DAG Directed Acyclic Graph Workflow Orchestration Low-Code No-Code Pipeline Tasks Dependencies Schedule Operator Python YAML Config Template

ApproachCode LevelเหมาะกับFlexibility
Python DAGFull CodeDeveloperสูงสุด
Dynamic DAG (YAML)Low CodeData Engineerสูง
Template DAGLow CodeData Analystปานกลาง
UI BuilderNo CodeBusiness Userต่ำ

DAG Patterns

# === Airflow DAG Patterns ===

# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.operators.bash import BashOperator
# from airflow.utils.dates import days_ago
# from datetime import timedelta
#
# # Pattern 1: Sequential Pipeline
# with DAG(
#     "etl_sequential",
#     schedule_interval="0 2 * * *",  # ทุกวัน 02:00
#     start_date=days_ago(1),
#     default_args={
#         "retries": 3,
#         "retry_delay": timedelta(minutes=5),
#         "execution_timeout": timedelta(hours=1),
#     },
#     catchup=False,
#     tags=["etl", "production"],
# ) as dag:
#     extract = PythonOperator(
#         task_id="extract",
#         python_callable=extract_data,
#     )
#     transform = PythonOperator(
#         task_id="transform",
#         python_callable=transform_data,
#     )
#     load = PythonOperator(
#         task_id="load",
#         python_callable=load_data,
#     )
#     notify = BashOperator(
#         task_id="notify",
#         bash_command='echo "Pipeline complete"',
#     )
#     extract >> transform >> load >> notify
#
# # Pattern 2: Fan-out / Fan-in
# with DAG("parallel_processing", ...):
#     start = PythonOperator(task_id="start", ...)
#     process_a = PythonOperator(task_id="process_a", ...)
#     process_b = PythonOperator(task_id="process_b", ...)
#     process_c = PythonOperator(task_id="process_c", ...)
#     merge = PythonOperator(task_id="merge", ...)
#     start >> [process_a, process_b, process_c] >> merge
#
# # Pattern 3: Conditional Branching
# from airflow.operators.python import BranchPythonOperator
# def choose_branch(**ctx):
#     if ctx["params"]["data_size"] > 1000000:
#         return "heavy_processing"
#     return "light_processing"
#
# branch = BranchPythonOperator(
#     task_id="branch",
#     python_callable=choose_branch,
# )

from dataclasses import dataclass
from typing import List

@dataclass
class DAGConfig:
    name: str
    pattern: str
    tasks: int
    schedule: str
    avg_duration: str
    status: str

dags = [
    DAGConfig("etl_orders", "Sequential", 4, "Every hour", "5 min", "Running"),
    DAGConfig("ml_training", "Fan-out", 8, "Daily 03:00", "2 hours", "Success"),
    DAGConfig("report_gen", "Conditional", 6, "Daily 06:00", "15 min", "Success"),
    DAGConfig("data_quality", "Sequential", 3, "Every 30 min", "2 min", "Running"),
    DAGConfig("backfill_hist", "Fan-out", 12, "Manual", "4 hours", "Paused"),
]

print("=== DAG Dashboard ===")
for d in dags:
    print(f"  [{d.status}] {d.name} ({d.pattern})")
    print(f"    Tasks: {d.tasks} | Schedule: {d.schedule} | Duration: {d.avg_duration}")

Dynamic DAG Generation

# === Low-Code: Dynamic DAG from YAML ===

# dag_configs/etl_sales.yaml
# dag_id: etl_sales
# schedule: "0 * * * *"
# description: "Sales data ETL pipeline"
# tags: ["etl", "sales"]
# tasks:
#   - id: extract_mysql
#     type: python
#     callable: extractors.mysql_extract
#     params: {table: "orders", since: "yesterday"}
#   - id: extract_api
#     type: python
#     callable: extractors.api_extract
#     params: {endpoint: "/api/sales"}
#   - id: transform
#     type: python
#     callable: transformers.clean_sales
#     depends_on: [extract_mysql, extract_api]
#   - id: load_warehouse
#     type: python
#     callable: loaders.load_bigquery
#     params: {dataset: "analytics", table: "sales"}
#     depends_on: [transform]

# dag_factory.py — Generate DAGs from YAML
# import yaml
# import glob
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from datetime import datetime
#
# def create_dag_from_config(config_path):
#     with open(config_path) as f:
#         config = yaml.safe_load(f)
#
#     dag = DAG(
#         dag_id=config["dag_id"],
#         schedule_interval=config.get("schedule", None),
#         description=config.get("description", ""),
#         start_date=datetime(2024, 1, 1),
#         catchup=False,
#         tags=config.get("tags", []),
#     )
#
#     tasks = {}
#     for task_cfg in config["tasks"]:
#         callable_fn = import_callable(task_cfg["callable"])
#         tasks[task_cfg["id"]] = PythonOperator(
#             task_id=task_cfg["id"],
#             python_callable=callable_fn,
#             op_kwargs=task_cfg.get("params", {}),
#             dag=dag,
#         )
#
#     # Set dependencies
#     for task_cfg in config["tasks"]:
#         for dep in task_cfg.get("depends_on", []):
#             tasks[dep] >> tasks[task_cfg["id"]]
#
#     return dag
#
# # Auto-discover and create DAGs
# for config_file in glob.glob("/opt/airflow/dag_configs/*.yaml"):
#     dag = create_dag_from_config(config_file)
#     globals()[dag.dag_id] = dag

import yaml

sample_config = {
    "dag_id": "etl_sales",
    "schedule": "0 * * * *",
    "tags": ["etl", "sales"],
    "tasks": [
        {"id": "extract", "type": "python", "callable": "extract_sales"},
        {"id": "transform", "type": "python", "depends_on": ["extract"]},
        {"id": "load", "type": "python", "depends_on": ["transform"]},
    ],
}

print("\nDynamic DAG Config (YAML):")
print(yaml.dump(sample_config, default_flow_style=False, allow_unicode=True))

Testing และ Monitoring

# === DAG Testing & Monitoring ===

# Unit Test DAG
# import pytest
# from airflow.models import DagBag
#
# def test_dag_loaded():
#     dagbag = DagBag()
#     assert "etl_sales" in dagbag.dags
#     assert dagbag.import_errors == {}
#
# def test_dag_structure():
#     dagbag = DagBag()
#     dag = dagbag.get_dag("etl_sales")
#     assert len(dag.tasks) == 4
#     assert dag.schedule_interval == "0 * * * *"
#
# def test_task_dependencies():
#     dagbag = DagBag()
#     dag = dagbag.get_dag("etl_sales")
#     extract = dag.get_task("extract")
#     transform = dag.get_task("transform")
#     assert transform in extract.downstream_list

# CLI Commands
# airflow dags list
# airflow dags test etl_sales 2024-03-01
# airflow tasks test etl_sales extract 2024-03-01
# airflow dags trigger etl_sales

best_practices = {
    "Idempotent": "รันซ้ำได้ผลเหมือนเดิม ใช้ UPSERT แทน INSERT",
    "Atomic": "แต่ละ Task ทำงานเดียว แยกย่อยไม่รวมกัน",
    "Small DAGs": "แยก DAG ย่อย ไม่รวมทุกอย่างใน DAG เดียว",
    "Retry": "ตั้ง retries=3, retry_delay=5min ทุก Task",
    "Timeout": "ตั้ง execution_timeout ป้องกัน Task ค้าง",
    "Alerts": "on_failure_callback แจ้ง Slack/Email เมื่อ Fail",
    "XCom": "ส่งข้อมูลเล็กๆระหว่าง Task ไม่ส่ง DataFrame ใหญ่",
    "Variables": "ใช้ Airflow Variables แทน Hardcode Config",
    "Connections": "ใช้ Airflow Connections เก็บ Credentials",
}

print("DAG Best Practices:")
for practice, desc in best_practices.items():
    print(f"  [{practice}]: {desc}")

# Monitoring Metrics
metrics = {
    "DAG Success Rate": "98.5%",
    "Avg Task Duration": "3.2 min",
    "Failed Tasks (24h)": "3",
    "Active DAGs": "15",
    "Queued Tasks": "2",
    "Scheduler Heartbeat": "OK",
}

print(f"\n\nMonitoring:")
for k, v in metrics.items():
    print(f"  {k}: {v}")

เคล็ดลับ

Apache Airflow คืออะไร

Open Source Workflow Orchestration Python DAG Task Dependencies Schedule Web UI Retry Operator Data Pipeline

DAG คืออะไร

Directed Acyclic Graph กราฟทิศทางไม่วนรอบ Workflow Tasks ลำดับ Extract Transform Load Operator Dependencies Schedule

Low-Code No-Code สำหรับ Airflow คืออะไร

ลด Code Dynamic DAG YAML JSON Config UI-based Template DAGs Data Analyst ไม่ถนัด Python Astronomer Cloud Composer

DAG Best Practices มีอะไรบ้าง

Idempotent Atomic Small DAGs Retry Timeout Alerts XCom Variables Connections Testing Documentation

สรุป

Apache Airflow DAG Design Low-Code No-Code YAML Dynamic DAG Template Sequential Fan-out Branching Best Practices Idempotent Retry Timeout Testing Monitoring Pipeline Orchestration

📖 บทความที่เกี่ยวข้อง

Airflow DAG Design Code Review Best Practiceอ่านบทความ → Airflow DAG Design Infrastructure as Codeอ่านบทความ → WiFi 6E Design Low Code No Codeอ่านบทความ → OSPF Area Design Low Code No Codeอ่านบทความ → REST API Design Low Code No Codeอ่านบทความ →

📚 ดูบทความทั้งหมด →