ai

Airflow DAG Design Low Code No Code — ออกแบบ DAG สำหรับ Low-Code

Airflow DAG Design Low Code No Code — ออกแบบ DAG สำหรับ Low-Code

Airflow DAG Design

Airflow DAG Design Low Code No Code — ออกแบบ DAG สำหรับ Low-Code

Apache Airflow DAG Directed Acyclic Graph Workflow Orchestration Low-Code No-Code Pipeline Tasks Dependencies Schedule Operator Python YAML Config Template

ApproachCode LevelเหมาะกับFlexibility
Python DAGFull CodeDeveloperสูงสุด
Dynamic DAG (YAML)Low CodeData Engineerสูง
Template DAGLow CodeData Analystปานกลาง
UI BuilderNo CodeBusiness Userต่ำ

DAG Patterns

=== Airflow DAG Patterns ===

อ่านเพิ่ม: Docker Compose ตัวอย่าง Config สำหรับ Self-hosted Apps · ดูรายละเอียด Docker Compose ตัวอย่าง Config สำหรับ Self-hosted Apps · อ่านเพิ่ม: AWS Iam คืออะไร — คู่มือ IT Infrastructure 2026 — คู่มือฉบับ

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.operators.bash import BashOperator

from airflow.utils.dates import days_ago

from datetime import timedelta

# Pattern 1: Sequential Pipeline

with DAG(

"etl_sequential",

schedule_interval="0 2 * * *", # ทุกวัน 02:00

start_date=days_ago(1),

default_args={

"retries": 3,

"retry_delay": timedelta(minutes=5),

"execution_timeout": timedelta(hours=1),

},

catchup=False,

tags=["etl", "production"],

) as dag:

extract = PythonOperator(

task_id="extract",

python_callable=extract_data,

)

transform = PythonOperator(

task_id="transform",

python_callable=transform_data,

)

เนื้อหาเกี่ยวข้อง — MLOps Pipeline สำหรับมือใหม่ Step by Step

load = PythonOperator(

task_id="load",

python_callable=load_data,

)

notify = BashOperator(

task_id="notify",

bash_command='echo "Pipeline complete"',

)

แนะนำเพิ่มเติม — แหล่งความรู้ Forex iCafeForex

extract >> transform >> load >> notify

# Pattern 2: Fan-out / Fan-in

with DAG("parallel_processing", ...):

start = PythonOperator(task_id="start", ...)

process_a = PythonOperator(task_id="process_a", ...)

process_b = PythonOperator(task_id="process_b", ...)

process_c = PythonOperator(task_id="process_c", ...)

merge = PythonOperator(task_id="merge", ...)

start >> [process_a, process_b, process_c] >> merge

# Pattern 3: Conditional Branching

from airflow.operators.python import BranchPythonOperator

def choose_branch(**ctx):

if ctx["params"]["data_size"] > 1000000:

return "heavy_processing"

return "light_processing"

branch = BranchPythonOperator(

task_id="branch",

python_callable=choose_branch,

)

from dataclasses import dataclass

from typing import List

@dataclass

class DAGConfig:

name: str

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: VanEck คือ — รู้จักบริษัทจัดการกองทุน ETF

pattern: str

tasks: int

schedule: str

avg_duration: str

status: str

dags = [

DAGConfig("etl_orders", "Sequential", 4, "Every hour", "5 min", "Running"),

DAGConfig("ml_training", "Fan-out", 8, "Daily 03:00", "2 hours", "Success"),

DAGConfig("report_gen", "Conditional", 6, "Daily 06:00", "15 min", "Success"),

DAGConfig("data_quality", "Sequential", 3, "Every 30 min", "2 min", "Running"),

DAGConfig("backfill_hist", "Fan-out", 12, "Manual", "4 hours", "Paused"),

]

print("=== DAG Dashboard ===")

for d in dags:

print(f" [{d.status}] {d.name} ({d.pattern})")

print(f" Tasks: {d.tasks} | Schedule: {d.schedule} | Duration: {d.avg_duration}")

แนะนำเพิ่มเติม — ติดตาม XM Signal

Dynamic DAG Generation

=== Low-Code: Dynamic DAG from YAML ===

dag_configs/etl_sales.yaml

dag_id: etl_sales

schedule: "0 * * * *"

description: "Sales data ETL pipeline"

tags: ["etl", "sales"]

tasks:

  • id: extract_mysql

type: python

callable: extractors.mysql_extract

params: {table: "orders", since: "yesterday"}

  • id: extract_api

type: python

callable: extractors.api_extract

params: {endpoint: "/api/sales"}

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Kotlin Ktor Edge Deployment — คู่มือฉบับสมบูรณ์ 2026

  • id: transform

type: python

callable: transformers.clean_sales

depends_on: [extract_mysql, extract_api]

  • id: load_warehouse

type: python

callable: loaders.load_bigquery

params: {dataset: "analytics", table: "sales"}

depends_on: [transform]

dag_factory.py — Generate DAGs from YAML

import yaml

import glob

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime

def create_dag_from_config(config_path):

with open(config_path) as f:

Airflow DAG Design Low Code No Code — ออกแบบ DAG สำหรับ Low-Code

config = yaml.safe_load(f)

dag = DAG(

dag_id=config["dag_id"],

schedule_interval=config.get("schedule", None),

description=config.get("description", ""),

start_date=datetime(2024, 1, 1),

catchup=False,

tags=config.get("tags", []),

)

tasks = {}

for task_cfg in config["tasks"]:

callable_fn = import_callable(task_cfg["callable"])

tasks[task_cfg["id"]] = PythonOperator(

task_id=task_cfg["id"],

python_callable=callable_fn,

op_kwargs=task_cfg.get("params", {}),

เนื้อหาเกี่ยวข้อง — Distributed Tracing Testing Strategy QA

dag=dag,

)

# Set dependencies

for task_cfg in config["tasks"]:

for dep in task_cfg.get("depends_on", []):

tasks[dep] >> tasks[task_cfg["id"]]

return dag

# Auto-discover and create DAGs

for config_file in glob.glob("/opt/airflow/dag_configs/*.yaml"):

dag = create_dag_from_config(config_file)

globals()[dag.dag_id] = dag

import yaml

sample_config = {

"dag_id": "etl_sales",

"schedule": "0 * * * *",

"tags": ["etl", "sales"],

"tasks": [

{"id": "extract", "type": "python", "callable": "extract_sales"},

{"id": "transform", "type": "python", "depends_on": ["extract"]},

{"id": "load", "type": "python", "depends_on": ["transform"]},

],

}

print("\nDynamic DAG Config (YAML):")

print(yaml.dump(sample_config, default_flow_style=False, allow_unicode=True))

Testing และ Monitoring

# === DAG Testing & Monitoring ===



# Unit Test DAG

# import pytest

# from airflow.models import DagBag

#

# def test_dag_loaded():

# dagbag = DagBag()

# assert "etl_sales" in dagbag.dags

# assert dagbag.import_errors == {}

#

# def test_dag_structure():

# dagbag = DagBag()

# dag = dagbag.get_dag("etl_sales")

# assert len(dag.tasks) == 4

# assert dag.schedule_interval == "0 * * * *"

#

# def test_task_dependencies():

# dagbag = DagBag()

# dag = dagbag.get_dag("etl_sales")

# extract = dag.get_task("extract")

# transform = dag.get_task("transform")

# assert transform in extract.downstream_list



# CLI Commands

# airflow dags list

# airflow dags test etl_sales 2024-03-01

# airflow tasks test etl_sales extract 2024-03-01

# airflow dags trigger etl_sales



best_practices = {

 "Idempotent": "รันซ้ำได้ผลเหมือนเดิม ใช้ UPSERT แทน INSERT",

 "Atomic": "แต่ละ Task ทำงานเดียว แยกย่อยไม่รวมกัน",

 "Small DAGs": "แยก DAG ย่อย ไม่รวมทุกอย่างใน DAG เดียว",

 "Retry": "ตั้ง retries=3, retry_delay=5min ทุก Task",

 "Timeout": "ตั้ง execution_timeout ป้องกัน Task ค้าง",

 "Alerts": "on_failure_callback แจ้ง Slack/Email เมื่อ Fail",

 "XCom": "ส่งข้อมูลเล็กๆระหว่าง Task ไม่ส่ง DataFrame ใหญ่",

 "Variables": "ใช้ Airflow Variables แทน Hardcode Config",

 "Connections": "ใช้ Airflow Connections เก็บ Credentials",

}



print("DAG Best Practices:")

for practice, desc in best_practices.items():

 print(f" [{practice}]: {desc}")



# Monitoring Metrics

metrics = {

 "DAG Success Rate": "98.5%",

 "Avg Task Duration": "3.2 min",

 "Failed Tasks (24h)": "3",

 "Active DAGs": "15",

 "Queued Tasks": "2",

 "Scheduler Heartbeat": "OK",

}



print(f"\n\nMonitoring:")

for k, v in metrics.items():

 print(f" {k}: {v}")

เคล็ดลับ

  • YAML DAGs: ใช้ YAML Config สร้าง DAG อัตโนมัติ ลด Code ซ้ำ
  • Template: สร้าง Template DAG ให้ทีมใช้ แก้แค่ Config
  • Test: ทดสอบ DAG ก่อน Deploy ด้วย pytest
  • Idempotent: ทุก Task ต้อง Idempotent รันซ้ำได้
  • Monitor: ตั้ง Alert ทุก DAG แจ้งเมื่อ Fail

Apache Airflow คืออะไร

Open Source Workflow Orchestration Python DAG Task Dependencies Schedule Web UI Retry Operator Data Pipeline

DAG คืออะไร

Directed Acyclic Graph กราฟทิศทางไม่วนรอบ Workflow Tasks ลำดับ Extract Transform Load Operator Dependencies Schedule

Low-Code No-Code สำหรับ Airflow คืออะไร

ลด Code Dynamic DAG YAML JSON Config UI-based Template DAGs Data Analyst ไม่ถนัด Python Astronomer Cloud Composer

DAG Best Practices มีอะไรบ้าง

Idempotent Atomic Small DAGs Retry Timeout Alerts XCom Variables Connections Testing Documentation

สรุป

Apache Airflow DAG Design Low-Code No-Code YAML Dynamic DAG Template Sequential Fan-out Branching Best Practices Idempotent Retry Timeout Testing Monitoring Pipeline Orchestration

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง