Airflow DAG Design Low Code No Code — ออกแบบ DAG สำหรับ Low-Code
Airflow DAG Design

Apache Airflow DAG Directed Acyclic Graph Workflow Orchestration Low-Code No-Code Pipeline Tasks Dependencies Schedule Operator Python YAML Config Template
| Approach | Code Level | เหมาะกับ | Flexibility |
|---|---|---|---|
| Python DAG | Full Code | Developer | สูงสุด |
| Dynamic DAG (YAML) | Low Code | Data Engineer | สูง |
| Template DAG | Low Code | Data Analyst | ปานกลาง |
| UI Builder | No Code | Business User | ต่ำ |
DAG Patterns
=== Airflow DAG Patterns ===
อ่านเพิ่ม: Docker Compose ตัวอย่าง Config สำหรับ Self-hosted Apps · ดูรายละเอียด Docker Compose ตัวอย่าง Config สำหรับ Self-hosted Apps · อ่านเพิ่ม: AWS Iam คืออะไร — คู่มือ IT Infrastructure 2026 — คู่มือฉบับ
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
# Pattern 1: Sequential Pipeline
with DAG(
"etl_sequential",
schedule_interval="0 2 * * *", # ทุกวัน 02:00
start_date=days_ago(1),
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=1),
},
catchup=False,
tags=["etl", "production"],
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
เนื้อหาเกี่ยวข้อง — MLOps Pipeline สำหรับมือใหม่ Step by Step
load = PythonOperator(
task_id="load",
python_callable=load_data,
)
notify = BashOperator(
task_id="notify",
bash_command='echo "Pipeline complete"',
)
แนะนำเพิ่มเติม — แหล่งความรู้ Forex iCafeForex
extract >> transform >> load >> notify
# Pattern 2: Fan-out / Fan-in
with DAG("parallel_processing", ...):
start = PythonOperator(task_id="start", ...)
process_a = PythonOperator(task_id="process_a", ...)
process_b = PythonOperator(task_id="process_b", ...)
process_c = PythonOperator(task_id="process_c", ...)
merge = PythonOperator(task_id="merge", ...)
start >> [process_a, process_b, process_c] >> merge
# Pattern 3: Conditional Branching
from airflow.operators.python import BranchPythonOperator
def choose_branch(**ctx):
if ctx["params"]["data_size"] > 1000000:
return "heavy_processing"
return "light_processing"
branch = BranchPythonOperator(
task_id="branch",
python_callable=choose_branch,
)
from dataclasses import dataclass
from typing import List
@dataclass
class DAGConfig:
name: str
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: VanEck คือ — รู้จักบริษัทจัดการกองทุน ETF
pattern: str
tasks: int
schedule: str
avg_duration: str
status: str
dags = [
DAGConfig("etl_orders", "Sequential", 4, "Every hour", "5 min", "Running"),
DAGConfig("ml_training", "Fan-out", 8, "Daily 03:00", "2 hours", "Success"),
DAGConfig("report_gen", "Conditional", 6, "Daily 06:00", "15 min", "Success"),
DAGConfig("data_quality", "Sequential", 3, "Every 30 min", "2 min", "Running"),
DAGConfig("backfill_hist", "Fan-out", 12, "Manual", "4 hours", "Paused"),
]
print("=== DAG Dashboard ===")
for d in dags:
print(f" [{d.status}] {d.name} ({d.pattern})")
print(f" Tasks: {d.tasks} | Schedule: {d.schedule} | Duration: {d.avg_duration}")
แนะนำเพิ่มเติม — ติดตาม XM Signal
Dynamic DAG Generation
=== Low-Code: Dynamic DAG from YAML ===
dag_configs/etl_sales.yaml
dag_id: etl_sales
schedule: "0 * * * *"
description: "Sales data ETL pipeline"
tags: ["etl", "sales"]
tasks:
- id: extract_mysql
type: python
callable: extractors.mysql_extract
params: {table: "orders", since: "yesterday"}
- id: extract_api
type: python
callable: extractors.api_extract
params: {endpoint: "/api/sales"}
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Kotlin Ktor Edge Deployment — คู่มือฉบับสมบูรณ์ 2026
- id: transform
type: python
callable: transformers.clean_sales
depends_on: [extract_mysql, extract_api]
- id: load_warehouse
type: python
callable: loaders.load_bigquery
params: {dataset: "analytics", table: "sales"}
depends_on: [transform]
dag_factory.py — Generate DAGs from YAML
import yaml
import glob
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def create_dag_from_config(config_path):
with open(config_path) as f:

config = yaml.safe_load(f)
dag = DAG(
dag_id=config["dag_id"],
schedule_interval=config.get("schedule", None),
description=config.get("description", ""),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=config.get("tags", []),
)
tasks = {}
for task_cfg in config["tasks"]:
callable_fn = import_callable(task_cfg["callable"])
tasks[task_cfg["id"]] = PythonOperator(
task_id=task_cfg["id"],
python_callable=callable_fn,
op_kwargs=task_cfg.get("params", {}),
เนื้อหาเกี่ยวข้อง — Distributed Tracing Testing Strategy QA
dag=dag,
)
# Set dependencies
for task_cfg in config["tasks"]:
for dep in task_cfg.get("depends_on", []):
tasks[dep] >> tasks[task_cfg["id"]]
return dag
# Auto-discover and create DAGs
for config_file in glob.glob("/opt/airflow/dag_configs/*.yaml"):
dag = create_dag_from_config(config_file)
globals()[dag.dag_id] = dag
import yaml
sample_config = {
"dag_id": "etl_sales",
"schedule": "0 * * * *",
"tags": ["etl", "sales"],
"tasks": [
{"id": "extract", "type": "python", "callable": "extract_sales"},
{"id": "transform", "type": "python", "depends_on": ["extract"]},
{"id": "load", "type": "python", "depends_on": ["transform"]},
],
}
print("\nDynamic DAG Config (YAML):")
print(yaml.dump(sample_config, default_flow_style=False, allow_unicode=True))
Testing และ Monitoring
# === DAG Testing & Monitoring ===
# Unit Test DAG
# import pytest
# from airflow.models import DagBag
#
# def test_dag_loaded():
# dagbag = DagBag()
# assert "etl_sales" in dagbag.dags
# assert dagbag.import_errors == {}
#
# def test_dag_structure():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# assert len(dag.tasks) == 4
# assert dag.schedule_interval == "0 * * * *"
#
# def test_task_dependencies():
# dagbag = DagBag()
# dag = dagbag.get_dag("etl_sales")
# extract = dag.get_task("extract")
# transform = dag.get_task("transform")
# assert transform in extract.downstream_list
# CLI Commands
# airflow dags list
# airflow dags test etl_sales 2024-03-01
# airflow tasks test etl_sales extract 2024-03-01
# airflow dags trigger etl_sales
best_practices = {
"Idempotent": "รันซ้ำได้ผลเหมือนเดิม ใช้ UPSERT แทน INSERT",
"Atomic": "แต่ละ Task ทำงานเดียว แยกย่อยไม่รวมกัน",
"Small DAGs": "แยก DAG ย่อย ไม่รวมทุกอย่างใน DAG เดียว",
"Retry": "ตั้ง retries=3, retry_delay=5min ทุก Task",
"Timeout": "ตั้ง execution_timeout ป้องกัน Task ค้าง",
"Alerts": "on_failure_callback แจ้ง Slack/Email เมื่อ Fail",
"XCom": "ส่งข้อมูลเล็กๆระหว่าง Task ไม่ส่ง DataFrame ใหญ่",
"Variables": "ใช้ Airflow Variables แทน Hardcode Config",
"Connections": "ใช้ Airflow Connections เก็บ Credentials",
}
print("DAG Best Practices:")
for practice, desc in best_practices.items():
print(f" [{practice}]: {desc}")
# Monitoring Metrics
metrics = {
"DAG Success Rate": "98.5%",
"Avg Task Duration": "3.2 min",
"Failed Tasks (24h)": "3",
"Active DAGs": "15",
"Queued Tasks": "2",
"Scheduler Heartbeat": "OK",
}
print(f"\n\nMonitoring:")
for k, v in metrics.items():
print(f" {k}: {v}")
เคล็ดลับ
- YAML DAGs: ใช้ YAML Config สร้าง DAG อัตโนมัติ ลด Code ซ้ำ
- Template: สร้าง Template DAG ให้ทีมใช้ แก้แค่ Config
- Test: ทดสอบ DAG ก่อน Deploy ด้วย pytest
- Idempotent: ทุก Task ต้อง Idempotent รันซ้ำได้
- Monitor: ตั้ง Alert ทุก DAG แจ้งเมื่อ Fail
Apache Airflow คืออะไร
Open Source Workflow Orchestration Python DAG Task Dependencies Schedule Web UI Retry Operator Data Pipeline
DAG คืออะไร
Directed Acyclic Graph กราฟทิศทางไม่วนรอบ Workflow Tasks ลำดับ Extract Transform Load Operator Dependencies Schedule
Low-Code No-Code สำหรับ Airflow คืออะไร
ลด Code Dynamic DAG YAML JSON Config UI-based Template DAGs Data Analyst ไม่ถนัด Python Astronomer Cloud Composer
DAG Best Practices มีอะไรบ้าง
Idempotent Atomic Small DAGs Retry Timeout Alerts XCom Variables Connections Testing Documentation
สรุป
Apache Airflow DAG Design Low-Code No-Code YAML Dynamic DAG Template Sequential Fan-out Branching Best Practices Idempotent Retry Timeout Testing Monitoring Pipeline Orchestration





