Technology

Airflow DAG Design Pub Sub Architecture

airflow dag design pub sub architecture
Airflow DAG Design Pub Sub Architecture | SiamCafe Blog
2025-11-26· อ. บอม — SiamCafe.net· 10,387 คำ

Airflow DAG กับ Pub/Sub

Airflow DAG กำหนดลำดับ Tasks แต่ละ Task มี Dependencies Airflow จัดการ Schedule Retry Monitoring Pub/Sub ช่วยทำ Event-driven Trigger DAG อัตโนมัติ

DAG Design Patterns Sequential Parallel Branch Dynamic Sensor-triggered ออกแบบให้เหมาะกับ Data Pipeline

DAG Design Patterns

# === Airflow DAG Design Patterns ===
# pip install apache-airflow apache-airflow-providers-google

from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, dag
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from airflow.models.baseoperator import chain

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["alerts@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(hours=2),
}

# 1. Sequential Pattern
@dag(
    schedule="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    catchup=False,
    tags=["etl", "sequential"],
)
def sequential_pipeline():
    @task()
    def extract():
        return {"rows": 10000, "source": "api"}

    @task()
    def transform(data):
        return {"rows": data["rows"], "cleaned": True}

    @task()
    def load(data):
        print(f"Loaded {data['rows']} rows")

    data = extract()
    cleaned = transform(data)
    load(cleaned)

# 2. Parallel Pattern
@dag(
    schedule="0 7 * * *",
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    catchup=False,
    tags=["etl", "parallel"],
)
def parallel_pipeline():
    @task()
    def extract_orders():
        return {"source": "orders", "rows": 5000}

    @task()
    def extract_customers():
        return {"source": "customers", "rows": 2000}

    @task()
    def extract_products():
        return {"source": "products", "rows": 500}

    @task()
    def merge(orders, customers, products):
        total = orders["rows"] + customers["rows"] + products["rows"]
        return {"total_rows": total}

    @task()
    def load(data):
        print(f"Loaded {data['total_rows']} total rows")

    # Parallel extraction -> Merge -> Load
    orders = extract_orders()
    customers = extract_customers()
    products = extract_products()
    merged = merge(orders, customers, products)
    load(merged)

# 3. Branch Pattern
@dag(
    schedule="0 8 * * *",
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    catchup=False,
    tags=["etl", "branch"],
)
def branch_pipeline():
    @task.branch()
    def check_data_size():
        row_count = 15000
        if row_count > 10000:
            return "process_large"
        return "process_small"

    @task()
    def process_large():
        print("Processing with Spark")

    @task()
    def process_small():
        print("Processing with Pandas")

    @task(trigger_rule="none_failed_min_one_success")
    def notify():
        print("Pipeline completed")

    branch = check_data_size()
    large = process_large()
    small = process_small()
    done = notify()

    branch >> [large, small] >> done

# 4. TaskGroup Pattern
@dag(
    schedule="0 9 * * *",
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    catchup=False,
    tags=["etl", "taskgroup"],
)
def taskgroup_pipeline():
    start = EmptyOperator(task_id="start")

    with TaskGroup("extract") as extract_group:
        @task()
        def extract_api():
            return "api_data"

        @task()
        def extract_db():
            return "db_data"

        extract_api()
        extract_db()

    with TaskGroup("transform") as transform_group:
        @task()
        def clean():
            return "cleaned"

        @task()
        def enrich():
            return "enriched"

        clean() >> enrich()

    end = EmptyOperator(task_id="end")
    start >> extract_group >> transform_group >> end

print("DAG Design Patterns:")
print("  1. Sequential: extract -> transform -> load")
print("  2. Parallel: extract_a | extract_b -> merge -> load")
print("  3. Branch: check -> large_path / small_path -> notify")
print("  4. TaskGroup: group related tasks together")

Pub/Sub Event-driven Triggers

# pubsub_triggers.py — Event-driven DAG Triggers
from dataclasses import dataclass, field
from typing import List, Dict, Callable
from datetime import datetime
from enum import Enum

class EventType(Enum):
    FILE_ARRIVAL = "file_arrival"
    API_WEBHOOK = "api_webhook"
    DB_CHANGE = "db_change"
    SCHEDULE = "schedule"
    MANUAL = "manual"

@dataclass
class TriggerEvent:
    event_type: EventType
    source: str
    payload: Dict
    timestamp: str = ""

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()

class EventDrivenOrchestrator:
    """Event-driven DAG Orchestrator"""

    def __init__(self):
        self.handlers: Dict[EventType, List[Callable]] = {}
        self.events_processed: List[TriggerEvent] = []

    def register_handler(self, event_type: EventType, handler: Callable):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)

    def process_event(self, event: TriggerEvent):
        """Process incoming event"""
        handlers = self.handlers.get(event.event_type, [])
        print(f"\n  Event: {event.event_type.value} from {event.source}")
        print(f"    Payload: {event.payload}")

        for handler in handlers:
            handler(event)

        self.events_processed.append(event)

    def dashboard(self):
        """Event Dashboard"""
        print(f"\n{'='*55}")
        print(f"Event-driven Orchestrator Dashboard")
        print(f"{'='*55}")
        print(f"  Registered Handlers: {sum(len(h) for h in self.handlers.values())}")
        print(f"  Events Processed: {len(self.events_processed)}")

        by_type = {}
        for e in self.events_processed:
            by_type[e.event_type.value] = by_type.get(e.event_type.value, 0) + 1

        print(f"\n  Events by Type:")
        for etype, count in by_type.items():
            print(f"    {etype}: {count}")

# Airflow Sensors สำหรับ Event-driven
# from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
# from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# from airflow.sensors.external_task import ExternalTaskSensor
#
# # Google Pub/Sub Sensor
# wait_for_event = PubSubPullSensor(
#     task_id="wait_for_pubsub",
#     project_id="my-project",
#     subscription="my-subscription",
#     max_messages=1,
#     ack_messages=True,
#     mode="reschedule",  # ประหยัด Worker Slot
# )
#
# # S3 File Arrival Sensor
# wait_for_file = S3KeySensor(
#     task_id="wait_for_s3_file",
#     bucket_name="data-lake",
#     bucket_key="raw/{{ ds }}/*.parquet",
#     mode="reschedule",
#     poke_interval=300,
#     timeout=3600,
# )
#
# # Kafka Consumer Trigger
# # ใช้ Airflow REST API trigger DAG จาก Kafka Consumer
# # curl -X POST http://airflow:8080/api/v1/dags/my_dag/dagRuns \
# #   -H "Content-Type: application/json" \
# #   -d '{"conf": {"event": "new_data", "source": "kafka"}}'

# ตัวอย่าง
orchestrator = EventDrivenOrchestrator()

def handle_file(event):
    print(f"    -> Triggering ETL DAG for {event.payload.get('file', 'unknown')}")

def handle_webhook(event):
    print(f"    -> Processing webhook from {event.source}")

orchestrator.register_handler(EventType.FILE_ARRIVAL, handle_file)
orchestrator.register_handler(EventType.API_WEBHOOK, handle_webhook)

events = [
    TriggerEvent(EventType.FILE_ARRIVAL, "s3://data-lake",
                {"file": "orders_2024.parquet", "size": "50MB"}),
    TriggerEvent(EventType.API_WEBHOOK, "stripe",
                {"event": "payment.completed", "amount": 1500}),
    TriggerEvent(EventType.FILE_ARRIVAL, "gcs://analytics",
                {"file": "users_export.csv", "size": "10MB"}),
]

for event in events:
    orchestrator.process_event(event)

orchestrator.dashboard()

Production Configuration

# === Airflow Production Configuration ===

# 1. airflow.cfg — Key Settings
# [core]
# executor = CeleryExecutor          # Production executor
# parallelism = 32                   # Max concurrent tasks
# max_active_runs_per_dag = 3        # Max concurrent DAG runs
# dag_file_processor_timeout = 120   # Seconds to parse DAG file
#
# [celery]
# broker_url = redis://redis:6379/0
# result_backend = redis://redis:6379/1
# worker_concurrency = 16
#
# [scheduler]
# min_file_process_interval = 30     # Seconds between DAG file scans
# dag_dir_list_interval = 300        # Seconds between dir scans
#
# [webserver]
# web_server_port = 8080
# default_ui_timezone = Asia/Bangkok

# 2. Docker Compose — Production
# version: '3.8'
# services:
#   postgres:
#     image: postgres:16
#     environment:
#       POSTGRES_USER: airflow
#       POSTGRES_PASSWORD: airflow
#       POSTGRES_DB: airflow
#
#   redis:
#     image: redis:7-alpine
#
#   airflow-webserver:
#     image: apache/airflow:2.8.1
#     command: webserver
#     ports: ["8080:8080"]
#     depends_on: [postgres, redis]
#
#   airflow-scheduler:
#     image: apache/airflow:2.8.1
#     command: scheduler
#     depends_on: [postgres, redis]
#
#   airflow-worker:
#     image: apache/airflow:2.8.1
#     command: celery worker
#     depends_on: [postgres, redis]
#     deploy:
#       replicas: 3

# 3. Pool Configuration
# airflow pools set default_pool 32 "Default pool"
# airflow pools set heavy_tasks 4 "CPU/Memory intensive tasks"
# airflow pools set api_calls 8 "External API rate limited"
# airflow pools set db_queries 16 "Database query tasks"

# 4. Connection Configuration
# airflow connections add 'kafka_default' \
#   --conn-type 'kafka' \
#   --conn-host 'kafka:9092' \
#   --conn-extra '{"security.protocol": "SASL_SSL"}'

# 5. Monitoring — Prometheus + Grafana
# pip install apache-airflow[statsd]
# [metrics]
# statsd_on = True
# statsd_host = statsd-exporter
# statsd_port = 8125

production_config = {
    "Executor": "CeleryExecutor (Redis broker)",
    "Workers": "3 replicas, 16 concurrency each",
    "Pools": "default(32), heavy(4), api(8), db(16)",
    "Database": "PostgreSQL 16",
    "Cache": "Redis 7",
    "Monitoring": "Prometheus + Grafana + StatsD",
    "Logging": "S3/GCS remote logging",
}

print("Production Configuration:")
for key, value in production_config.items():
    print(f"  {key}: {value}")

Best Practices

Airflow DAG คืออะไร

Directed Acyclic Graph Workflow Apache Airflow ลำดับ Tasks Dependencies ชัดเจน ไม่ Circular Airflow Schedule Retry Monitoring อัตโนมัติ เขียน Python

Pub/Sub ช่วย Airflow อย่างไร

Event-driven แทน Schedule File ใหม่ S3 Trigger DAG อัตโนมัติ Kafka Google Pub/Sub ส่ง Events Airflow Sensor รับเริ่ม Pipeline ลดเวลารอ เร็วขึ้น

DAG Design Patterns มีอะไรบ้าง

Sequential ต่อกัน Parallel พร้อมกัน Branch แยกเงื่อนไข Dynamic สร้างตาม Data TaskGroup จัดกลุ่ม Sensor-triggered รอ Event XCom ส่งข้อมูลระหว่าง Tasks

วิธี Optimize Airflow DAG ทำอย่างไร

TaskGroup แทน SubDAG Pool จำกัด Concurrent ShortCircuitOperator ข้าม Tasks @task decorator Retry Policy SLA Sensors Reschedule Mode ประหยัด Worker Slot

สรุป

Airflow DAG Design ใช้ Patterns ที่เหมาะสม Sequential Parallel Branch TaskGroup Pub/Sub ให้ Event-driven Triggers Sensors รับ Events จาก Kafka S3 Production ใช้ CeleryExecutor Pools จำกัด Resources Monitoring ด้วย Prometheus Grafana

📖 บทความที่เกี่ยวข้อง

Airflow DAG Design Hexagonal Architectureอ่านบทความ → OSPF Area Design Pub Sub Architectureอ่านบทความ → GCP Pub Sub Architecture Design Patternอ่านบทความ → Redis Pub Sub Architecture Design Patternอ่านบทความ → Airflow DAG Design SaaS Architectureอ่านบทความ →

📚 ดูบทความทั้งหมด →