Airflow DAG กับ Pub/Sub
Airflow DAG กำหนดลำดับ Tasks แต่ละ Task มี Dependencies Airflow จัดการ Schedule Retry Monitoring Pub/Sub ช่วยทำ Event-driven Trigger DAG อัตโนมัติ
DAG Design Patterns Sequential Parallel Branch Dynamic Sensor-triggered ออกแบบให้เหมาะกับ Data Pipeline
DAG Design Patterns
# === Airflow DAG Design Patterns ===
# pip install apache-airflow apache-airflow-providers-google
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, dag
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from airflow.models.baseoperator import chain
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["alerts@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# 1. Sequential Pattern
@dag(
schedule="0 6 * * *",
start_date=datetime(2024, 1, 1),
default_args=default_args,
catchup=False,
tags=["etl", "sequential"],
)
def sequential_pipeline():
@task()
def extract():
return {"rows": 10000, "source": "api"}
@task()
def transform(data):
return {"rows": data["rows"], "cleaned": True}
@task()
def load(data):
print(f"Loaded {data['rows']} rows")
data = extract()
cleaned = transform(data)
load(cleaned)
# 2. Parallel Pattern
@dag(
schedule="0 7 * * *",
start_date=datetime(2024, 1, 1),
default_args=default_args,
catchup=False,
tags=["etl", "parallel"],
)
def parallel_pipeline():
@task()
def extract_orders():
return {"source": "orders", "rows": 5000}
@task()
def extract_customers():
return {"source": "customers", "rows": 2000}
@task()
def extract_products():
return {"source": "products", "rows": 500}
@task()
def merge(orders, customers, products):
total = orders["rows"] + customers["rows"] + products["rows"]
return {"total_rows": total}
@task()
def load(data):
print(f"Loaded {data['total_rows']} total rows")
# Parallel extraction -> Merge -> Load
orders = extract_orders()
customers = extract_customers()
products = extract_products()
merged = merge(orders, customers, products)
load(merged)
# 3. Branch Pattern
@dag(
schedule="0 8 * * *",
start_date=datetime(2024, 1, 1),
default_args=default_args,
catchup=False,
tags=["etl", "branch"],
)
def branch_pipeline():
@task.branch()
def check_data_size():
row_count = 15000
if row_count > 10000:
return "process_large"
return "process_small"
@task()
def process_large():
print("Processing with Spark")
@task()
def process_small():
print("Processing with Pandas")
@task(trigger_rule="none_failed_min_one_success")
def notify():
print("Pipeline completed")
branch = check_data_size()
large = process_large()
small = process_small()
done = notify()
branch >> [large, small] >> done
# 4. TaskGroup Pattern
@dag(
schedule="0 9 * * *",
start_date=datetime(2024, 1, 1),
default_args=default_args,
catchup=False,
tags=["etl", "taskgroup"],
)
def taskgroup_pipeline():
start = EmptyOperator(task_id="start")
with TaskGroup("extract") as extract_group:
@task()
def extract_api():
return "api_data"
@task()
def extract_db():
return "db_data"
extract_api()
extract_db()
with TaskGroup("transform") as transform_group:
@task()
def clean():
return "cleaned"
@task()
def enrich():
return "enriched"
clean() >> enrich()
end = EmptyOperator(task_id="end")
start >> extract_group >> transform_group >> end
print("DAG Design Patterns:")
print(" 1. Sequential: extract -> transform -> load")
print(" 2. Parallel: extract_a | extract_b -> merge -> load")
print(" 3. Branch: check -> large_path / small_path -> notify")
print(" 4. TaskGroup: group related tasks together")
Pub/Sub Event-driven Triggers
# pubsub_triggers.py — Event-driven DAG Triggers
from dataclasses import dataclass, field
from typing import List, Dict, Callable
from datetime import datetime
from enum import Enum
class EventType(Enum):
FILE_ARRIVAL = "file_arrival"
API_WEBHOOK = "api_webhook"
DB_CHANGE = "db_change"
SCHEDULE = "schedule"
MANUAL = "manual"
@dataclass
class TriggerEvent:
event_type: EventType
source: str
payload: Dict
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
class EventDrivenOrchestrator:
"""Event-driven DAG Orchestrator"""
def __init__(self):
self.handlers: Dict[EventType, List[Callable]] = {}
self.events_processed: List[TriggerEvent] = []
def register_handler(self, event_type: EventType, handler: Callable):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def process_event(self, event: TriggerEvent):
"""Process incoming event"""
handlers = self.handlers.get(event.event_type, [])
print(f"\n Event: {event.event_type.value} from {event.source}")
print(f" Payload: {event.payload}")
for handler in handlers:
handler(event)
self.events_processed.append(event)
def dashboard(self):
"""Event Dashboard"""
print(f"\n{'='*55}")
print(f"Event-driven Orchestrator Dashboard")
print(f"{'='*55}")
print(f" Registered Handlers: {sum(len(h) for h in self.handlers.values())}")
print(f" Events Processed: {len(self.events_processed)}")
by_type = {}
for e in self.events_processed:
by_type[e.event_type.value] = by_type.get(e.event_type.value, 0) + 1
print(f"\n Events by Type:")
for etype, count in by_type.items():
print(f" {etype}: {count}")
# Airflow Sensors สำหรับ Event-driven
# from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
# from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# from airflow.sensors.external_task import ExternalTaskSensor
#
# # Google Pub/Sub Sensor
# wait_for_event = PubSubPullSensor(
# task_id="wait_for_pubsub",
# project_id="my-project",
# subscription="my-subscription",
# max_messages=1,
# ack_messages=True,
# mode="reschedule", # ประหยัด Worker Slot
# )
#
# # S3 File Arrival Sensor
# wait_for_file = S3KeySensor(
# task_id="wait_for_s3_file",
# bucket_name="data-lake",
# bucket_key="raw/{{ ds }}/*.parquet",
# mode="reschedule",
# poke_interval=300,
# timeout=3600,
# )
#
# # Kafka Consumer Trigger
# # ใช้ Airflow REST API trigger DAG จาก Kafka Consumer
# # curl -X POST http://airflow:8080/api/v1/dags/my_dag/dagRuns \
# # -H "Content-Type: application/json" \
# # -d '{"conf": {"event": "new_data", "source": "kafka"}}'
# ตัวอย่าง
orchestrator = EventDrivenOrchestrator()
def handle_file(event):
print(f" -> Triggering ETL DAG for {event.payload.get('file', 'unknown')}")
def handle_webhook(event):
print(f" -> Processing webhook from {event.source}")
orchestrator.register_handler(EventType.FILE_ARRIVAL, handle_file)
orchestrator.register_handler(EventType.API_WEBHOOK, handle_webhook)
events = [
TriggerEvent(EventType.FILE_ARRIVAL, "s3://data-lake",
{"file": "orders_2024.parquet", "size": "50MB"}),
TriggerEvent(EventType.API_WEBHOOK, "stripe",
{"event": "payment.completed", "amount": 1500}),
TriggerEvent(EventType.FILE_ARRIVAL, "gcs://analytics",
{"file": "users_export.csv", "size": "10MB"}),
]
for event in events:
orchestrator.process_event(event)
orchestrator.dashboard()
Production Configuration
# === Airflow Production Configuration ===
# 1. airflow.cfg — Key Settings
# [core]
# executor = CeleryExecutor # Production executor
# parallelism = 32 # Max concurrent tasks
# max_active_runs_per_dag = 3 # Max concurrent DAG runs
# dag_file_processor_timeout = 120 # Seconds to parse DAG file
#
# [celery]
# broker_url = redis://redis:6379/0
# result_backend = redis://redis:6379/1
# worker_concurrency = 16
#
# [scheduler]
# min_file_process_interval = 30 # Seconds between DAG file scans
# dag_dir_list_interval = 300 # Seconds between dir scans
#
# [webserver]
# web_server_port = 8080
# default_ui_timezone = Asia/Bangkok
# 2. Docker Compose — Production
# version: '3.8'
# services:
# postgres:
# image: postgres:16
# environment:
# POSTGRES_USER: airflow
# POSTGRES_PASSWORD: airflow
# POSTGRES_DB: airflow
#
# redis:
# image: redis:7-alpine
#
# airflow-webserver:
# image: apache/airflow:2.8.1
# command: webserver
# ports: ["8080:8080"]
# depends_on: [postgres, redis]
#
# airflow-scheduler:
# image: apache/airflow:2.8.1
# command: scheduler
# depends_on: [postgres, redis]
#
# airflow-worker:
# image: apache/airflow:2.8.1
# command: celery worker
# depends_on: [postgres, redis]
# deploy:
# replicas: 3
# 3. Pool Configuration
# airflow pools set default_pool 32 "Default pool"
# airflow pools set heavy_tasks 4 "CPU/Memory intensive tasks"
# airflow pools set api_calls 8 "External API rate limited"
# airflow pools set db_queries 16 "Database query tasks"
# 4. Connection Configuration
# airflow connections add 'kafka_default' \
# --conn-type 'kafka' \
# --conn-host 'kafka:9092' \
# --conn-extra '{"security.protocol": "SASL_SSL"}'
# 5. Monitoring — Prometheus + Grafana
# pip install apache-airflow[statsd]
# [metrics]
# statsd_on = True
# statsd_host = statsd-exporter
# statsd_port = 8125
production_config = {
"Executor": "CeleryExecutor (Redis broker)",
"Workers": "3 replicas, 16 concurrency each",
"Pools": "default(32), heavy(4), api(8), db(16)",
"Database": "PostgreSQL 16",
"Cache": "Redis 7",
"Monitoring": "Prometheus + Grafana + StatsD",
"Logging": "S3/GCS remote logging",
}
print("Production Configuration:")
for key, value in production_config.items():
print(f" {key}: {value}")
Best Practices
- TaskGroup: ใช้ TaskGroup แทน SubDAG จัดกลุ่ม Tasks
- Sensor Mode: ใช้ mode="reschedule" ประหยัด Worker Slot
- @task Decorator: ใช้ @task แทน PythonOperator เขียนสั้นกว่า
- Pools: ตั้ง Pool จำกัด Concurrent Tasks ป้องกัน Overload
- Idempotent: ทำ Tasks ให้ Idempotent รันซ้ำได้ผลเหมือนเดิม
- SLA: ตั้ง SLA สำหรับ Critical DAGs แจ้งเตือนเมื่อช้ากว่ากำหนด
Airflow DAG คืออะไร
Directed Acyclic Graph Workflow Apache Airflow ลำดับ Tasks Dependencies ชัดเจน ไม่ Circular Airflow Schedule Retry Monitoring อัตโนมัติ เขียน Python
Pub/Sub ช่วย Airflow อย่างไร
Event-driven แทน Schedule File ใหม่ S3 Trigger DAG อัตโนมัติ Kafka Google Pub/Sub ส่ง Events Airflow Sensor รับเริ่ม Pipeline ลดเวลารอ เร็วขึ้น
DAG Design Patterns มีอะไรบ้าง
Sequential ต่อกัน Parallel พร้อมกัน Branch แยกเงื่อนไข Dynamic สร้างตาม Data TaskGroup จัดกลุ่ม Sensor-triggered รอ Event XCom ส่งข้อมูลระหว่าง Tasks
วิธี Optimize Airflow DAG ทำอย่างไร
TaskGroup แทน SubDAG Pool จำกัด Concurrent ShortCircuitOperator ข้าม Tasks @task decorator Retry Policy SLA Sensors Reschedule Mode ประหยัด Worker Slot
สรุป
Airflow DAG Design ใช้ Patterns ที่เหมาะสม Sequential Parallel Branch TaskGroup Pub/Sub ให้ Event-driven Triggers Sensors รับ Events จาก Kafka S3 Production ใช้ CeleryExecutor Pools จำกัด Resources Monitoring ด้วย Prometheus Grafana
