Apache Airflow คืออะไร
Apache Airflow เป็น open source workflow orchestration platform สำหรับ programmatically author, schedule และ monitor workflows ใช้ Python เขียน DAGs (Directed Acyclic Graphs) กำหนด task dependencies และ execution order พัฒนาโดย Airbnb และเป็น Apache top-level project ตั้งแต่ปี 2019
Components หลักของ Airflow ได้แก่ Web Server ให้ UI สำหรับ monitor DAGs, Scheduler ตัดสินใจว่า task ไหนต้อง run เมื่อไหร่, Executor ทำหน้าที่ run tasks จริง (Local, Celery, Kubernetes), Metadata Database เก็บ state ของทุก DAGs และ tasks (PostgreSQL/MySQL), Workers process ที่ run tasks จริง (สำหรับ CeleryExecutor/KubernetesExecutor)
High Availability (HA) สำหรับ Airflow สำคัญมากสำหรับ production เพราะถ้า Airflow ล่ม workflows ทั้งหมดจะหยุดทำงาน ส่งผลกระทบต่อ data pipelines, ETL jobs, ML training และ business processes HA setup ต้อง redundancy ทุก component ไม่มี single point of failure
ติดตั้ง Airflow แบบ High Availability
Setup Airflow HA บน Kubernetes
# === Airflow HA Installation ===
# 1. Prerequisites
# ===================================
# - Kubernetes cluster (3+ nodes)
# - Helm 3
# - PostgreSQL (managed, e.g., Cloud SQL / RDS)
# - Redis (for CeleryExecutor)
# 2. Install Airflow via Helm
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# 3. Create values.yaml for HA configuration
cat > airflow-values.yaml << 'EOF'
# Airflow HA Configuration
executor: CeleryExecutor
# Web Server — Multiple replicas with LoadBalancer
webserver:
replicas: 2
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2"
memory: "4Gi"
service:
type: LoadBalancer
livenessProbe:
initialDelaySeconds: 15
periodSeconds: 10
readinessProbe:
initialDelaySeconds: 15
periodSeconds: 10
# Scheduler — HA Scheduler (Airflow 2.0+)
scheduler:
replicas: 2 # HA scheduler
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "4"
memory: "8Gi"
# Workers — Auto-scaling
workers:
replicas: 3
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "4"
memory: "8Gi"
keda:
enabled: true
minReplicaCount: 2
maxReplicaCount: 10
# Triggerer for deferrable operators
triggerer:
replicas: 2
resources:
requests:
cpu: "500m"
memory: "512Mi"
# External PostgreSQL (HA)
postgresql:
enabled: false
data:
metadataConnection:
user: airflow
pass: secretpassword
protocol: postgresql
host: airflow-db.xxxx.rds.amazonaws.com
port: 5432
db: airflow
sslmode: require
# External Redis (HA)
redis:
enabled: false
data:
brokerUrl: redis://:password@redis-cluster.xxxx.cache.amazonaws.com:6379/0
# DAGs sync via Git
dags:
gitSync:
enabled: true
repo: https://github.com/my-org/airflow-dags.git
branch: main
rev: HEAD
depth: 1
maxFailures: 3
subPath: "dags"
period: 60s
# Logging to S3
config:
AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://airflow-logs/logs"
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "aws_default"
AIRFLOW__CORE__PARALLELISM: "64"
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: "32"
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "30"
EOF
# 4. Deploy
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
-f airflow-values.yaml
# 5. Verify HA
kubectl get pods -n airflow
# Should see: 2 webservers, 2 schedulers, 3+ workers, 2 triggerers
echo "Airflow HA deployed"
DAG Design Best Practices
เขียน DAGs ที่ดีสำหรับ production
#!/usr/bin/env python3
# dags/etl_pipeline.py — Production DAG Example
"""
ETL Pipeline DAG with best practices:
- Idempotent tasks
- Proper error handling
- SLA monitoring
- Task groups for organization
- Dynamic task generation
"""
from datetime import datetime, timedelta
# from airflow import DAG
# from airflow.decorators import dag, task, task_group
# from airflow.operators.python import PythonOperator
# from airflow.operators.empty import EmptyOperator
# from airflow.providers.postgres.operators.postgres import PostgresOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.models import Variable
# Best Practice 1: Default args with retry and SLA
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email": ["alerts@example.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=60),
"execution_timeout": timedelta(hours=2),
"sla": timedelta(hours=4),
}
# Best Practice 2: Use @dag decorator
# @dag(
# dag_id="etl_pipeline_v2",
# default_args=default_args,
# description="Production ETL Pipeline",
# schedule="0 2 * * *", # Daily at 2 AM
# start_date=datetime(2025, 1, 1),
# catchup=False,
# max_active_runs=1,
# tags=["etl", "production", "data-team"],
# )
# def etl_pipeline():
#
# # Best Practice 3: Task Groups
# @task_group(group_id="extract")
# def extract_data():
# @task(task_id="extract_users")
# def extract_users(**context):
# ds = context["ds"]
# # Extract users for date partition
# return {"rows": 50000, "date": ds}
#
# @task(task_id="extract_orders")
# def extract_orders(**context):
# ds = context["ds"]
# return {"rows": 120000, "date": ds}
#
# @task(task_id="extract_products")
# def extract_products(**context):
# return {"rows": 5000}
#
# return extract_users(), extract_orders(), extract_products()
#
# @task_group(group_id="transform")
# def transform_data(extracted):
# @task(task_id="transform_and_join")
# def transform(data):
# # Idempotent: overwrite partition, not append
# return {"transformed_rows": 100000}
#
# return transform(extracted)
#
# @task_group(group_id="load")
# def load_data(transformed):
# @task(task_id="load_to_warehouse")
# def load(data):
# return {"loaded": True}
#
# return load(transformed)
#
# @task(task_id="quality_check")
# def quality_check(**context):
# # Data quality validation
# checks = {
# "row_count_valid": True,
# "no_nulls_in_pk": True,
# "referential_integrity": True,
# }
# failed = [k for k, v in checks.items() if not v]
# if failed:
# raise ValueError(f"Quality checks failed: {failed}")
# return checks
#
# # Best Practice 4: Proper task dependencies
# start = EmptyOperator(task_id="start")
# end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.ALL_SUCCESS)
#
# extracted = extract_data()
# transformed = transform_data(extracted)
# loaded = load_data(transformed)
# qc = quality_check()
#
# start >> extracted >> transformed >> loaded >> qc >> end
# Best Practice 5: Instantiate DAG
# etl_dag = etl_pipeline()
print("DAG design best practices defined")
HA Architecture Patterns
Architecture patterns สำหรับ Airflow HA
# === HA Architecture Patterns ===
# 1. Multi-AZ Deployment
# ===================================
# Region: ap-southeast-1
#
# AZ-a:
# - Webserver Pod (1)
# - Scheduler Pod (1)
# - Worker Pods (2)
# - Triggerer Pod (1)
#
# AZ-b:
# - Webserver Pod (1)
# - Scheduler Pod (1)
# - Worker Pods (2)
# - Triggerer Pod (1)
#
# AZ-c:
# - Worker Pods (2) — overflow
#
# Shared Services (Managed):
# - RDS PostgreSQL (Multi-AZ)
# - ElastiCache Redis (Multi-AZ)
# - S3 for logs and DAGs
# - ALB for web traffic
# 2. Kubernetes Pod Anti-Affinity
# ===================================
cat > pod-anti-affinity.yaml << 'EOF'
# Ensure scheduler pods run on different nodes
scheduler:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: component
operator: In
values:
- scheduler
topologyKey: kubernetes.io/hostname
# Spread workers across AZs
workers:
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: component
operator: In
values:
- worker
topologyKey: topology.kubernetes.io/zone
EOF
# 3. Database HA
# ===================================
# PostgreSQL HA options:
# - AWS RDS Multi-AZ (automatic failover)
# - Cloud SQL HA (automatic failover)
# - Self-managed: Patroni + PgBouncer
#
# Redis HA options:
# - AWS ElastiCache (cluster mode)
# - Redis Sentinel (self-managed)
# - Redis Cluster (self-managed)
# 4. Health Checks
# ===================================
cat > healthcheck.py << 'PYEOF'
#!/usr/bin/env python3
"""Airflow HA Health Check Script"""
import json
import urllib.request
import sys
def check_airflow_health(base_url="http://localhost:8080"):
try:
req = urllib.request.Request(f"{base_url}/health")
with urllib.request.urlopen(req, timeout=10) as response:
data = json.loads(response.read())
scheduler_ok = data.get("scheduler", {}).get("status") == "healthy"
metadb_ok = data.get("metadatabase", {}).get("status") == "healthy"
triggerer_ok = data.get("triggerer", {}).get("status") == "healthy"
result = {
"scheduler": "OK" if scheduler_ok else "FAIL",
"database": "OK" if metadb_ok else "FAIL",
"triggerer": "OK" if triggerer_ok else "FAIL",
"overall": "healthy" if all([scheduler_ok, metadb_ok]) else "unhealthy",
}
print(json.dumps(result, indent=2))
return 0 if result["overall"] == "healthy" else 1
except Exception as e:
print(json.dumps({"error": str(e), "overall": "unreachable"}))
return 1
sys.exit(check_airflow_health())
PYEOF
chmod +x healthcheck.py
echo "HA architecture configured"
Monitoring และ Alerting
Monitor Airflow HA cluster
#!/usr/bin/env python3
# airflow_monitor.py — Airflow Monitoring
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")
class AirflowMonitor:
def __init__(self):
self.metrics = {}
def collect_metrics(self):
return {
"timestamp": datetime.utcnow().isoformat(),
"scheduler": {
"heartbeat_age_sec": 5,
"dag_bag_size": 45,
"dag_processing_time_sec": 12.5,
"orphaned_tasks": 0,
"running_tasks": 15,
"queued_tasks": 3,
},
"workers": {
"active": 4,
"total": 6,
"task_execution_rate": 45.2,
"avg_task_duration_sec": 180,
},
"database": {
"connection_count": 35,
"max_connections": 100,
"query_latency_ms": 8,
"pool_utilization_pct": 35,
},
"dag_stats": {
"total_dags": 45,
"active_dags": 38,
"paused_dags": 7,
"failed_dag_runs_24h": 2,
"success_rate_24h_pct": 96.5,
},
}
def check_alerts(self, metrics):
alerts = []
sched = metrics["scheduler"]
if sched["heartbeat_age_sec"] > 30:
alerts.append({"severity": "critical", "msg": "Scheduler heartbeat stale"})
if sched["orphaned_tasks"] > 0:
alerts.append({"severity": "high", "msg": f"{sched['orphaned_tasks']} orphaned tasks"})
if sched["queued_tasks"] > 50:
alerts.append({"severity": "medium", "msg": "Task queue backlog"})
db = metrics["database"]
if db["pool_utilization_pct"] > 80:
alerts.append({"severity": "high", "msg": "DB pool near capacity"})
dags = metrics["dag_stats"]
if dags["success_rate_24h_pct"] < 90:
alerts.append({"severity": "high", "msg": f"DAG success rate low: {dags['success_rate_24h_pct']}%"})
return {
"alert_count": len(alerts),
"status": "critical" if any(a["severity"] == "critical" for a in alerts)
else "warning" if alerts else "healthy",
"alerts": alerts,
}
def capacity_report(self, metrics):
workers = metrics["workers"]
sched = metrics["scheduler"]
tasks_per_worker = sched["running_tasks"] / max(workers["active"], 1)
headroom_pct = (1 - workers["active"] / max(workers["total"], 1)) * 100
return {
"current_load": {
"running_tasks": sched["running_tasks"],
"queued_tasks": sched["queued_tasks"],
"active_workers": workers["active"],
"tasks_per_worker": round(tasks_per_worker, 1),
},
"capacity": {
"total_workers": workers["total"],
"headroom_pct": round(headroom_pct, 1),
"estimated_max_concurrent_tasks": workers["total"] * 8,
},
"recommendation": "scale_up" if headroom_pct < 20 else "optimal" if headroom_pct < 60 else "scale_down",
}
monitor = AirflowMonitor()
metrics = monitor.collect_metrics()
alerts = monitor.check_alerts(metrics)
capacity = monitor.capacity_report(metrics)
print("Metrics:", json.dumps(metrics["scheduler"], indent=2))
print("Alerts:", json.dumps(alerts, indent=2))
print("Capacity:", json.dumps(capacity, indent=2))
Troubleshooting และ Disaster Recovery
แก้ปัญหาและ DR สำหรับ Airflow
# === Troubleshooting & DR ===
# 1. Common Issues and Fixes
# ===================================
# Issue: Scheduler not picking up new DAGs
# Fix:
# kubectl rollout restart deployment/airflow-scheduler -n airflow
# Check: AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL setting
# Issue: Tasks stuck in "queued" state
# Fix:
# airflow tasks clear -t -s -e
# Check Redis connection and worker status
# Issue: Database connection pool exhausted
# Fix:
# Increase AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE
# Add PgBouncer as connection pooler
# Check for long-running transactions
# Issue: Worker OOMKilled
# Fix:
# Increase worker memory limits
# Optimize task memory usage
# Use KubernetesPodOperator for heavy tasks
# 2. Database Backup and Restore
# ===================================
# Backup (daily cron)
# pg_dump -h airflow-db.rds.amazonaws.com \
# -U airflow -d airflow \
# --format=custom \
# -f /backups/airflow_$(date +%Y%m%d).dump
# Restore
# pg_restore -h airflow-db.rds.amazonaws.com \
# -U airflow -d airflow \
# --clean --if-exists \
# /backups/airflow_20250115.dump
# 3. DR Runbook
# ===================================
cat > dr_runbook.md << 'EOF'
# Airflow DR Runbook
## Scenario 1: Single Worker Failure
- Impact: Minimal, tasks reassigned to other workers
- Action: Worker auto-recovers via Kubernetes
- RTO: < 5 minutes (automatic)
## Scenario 2: Scheduler Failure
- Impact: No new tasks scheduled
- Action: HA scheduler takes over automatically
- RTO: < 30 seconds (automatic failover)
## Scenario 3: Database Failure
- Impact: All Airflow operations stop
- Action: RDS Multi-AZ automatic failover
- RTO: < 5 minutes
- Verify: Check /health endpoint after failover
## Scenario 4: Full Cluster Failure
- Impact: Complete outage
- Action:
1. Switch DNS to DR region
2. Restore DB from latest backup
3. Deploy Airflow in DR cluster
4. Sync DAGs from Git
5. Resume paused DAGs
- RTO: 30-60 minutes
- RPO: Last DB backup (hourly)
EOF
# 4. Automated Recovery Script
cat > recover.sh << 'SHEOF'
#!/bin/bash
set -e
echo "Checking Airflow health..."
HEALTH=$(curl -s http://localhost:8080/health)
SCHEDULER=$(echo $HEALTH | python3 -c "import sys,json; print(json.load(sys.stdin)['scheduler']['status'])")
if [ "$SCHEDULER" != "healthy" ]; then
echo "Scheduler unhealthy, restarting..."
kubectl rollout restart deployment/airflow-scheduler -n airflow
sleep 30
# Re-check
HEALTH=$(curl -s http://localhost:8080/health)
SCHEDULER=$(echo $HEALTH | python3 -c "import sys,json; print(json.load(sys.stdin)['scheduler']['status'])")
if [ "$SCHEDULER" != "healthy" ]; then
echo "CRITICAL: Scheduler still unhealthy after restart"
# Send alert
exit 1
fi
fi
echo "Airflow health: OK"
SHEOF
chmod +x recover.sh
echo "Troubleshooting and DR configured"
FAQ คำถามที่พบบ่อย
Q: CeleryExecutor กับ KubernetesExecutor เลือกใช้อย่างไร?
A: CeleryExecutor เหมาะเมื่อต้องการ task startup เร็ว (< 1 วินาที), workload สม่ำเสมอ, ต้องการ worker pool พร้อมใช้ตลอด ข้อเสีย resources ถูกจองแม้ไม่มี tasks KubernetesExecutor เหมาะเมื่อ tasks ต้องการ resources แตกต่างกันมาก, ต้องการ isolation ระหว่าง tasks, ต้องการ scale to zero เมื่อไม่มีงาน ข้อเสีย task startup ช้ากว่า (10-30 วินาที) สำหรับ HA แนะนำ CeleryKubernetesExecutor ที่รวมข้อดีทั้งสอง tasks ทั่วไปใช้ Celery tasks หนักใช้ KubernetesPodOperator
Q: Airflow 2.x HA Scheduler ทำงานอย่างไร?
A: ตั้งแต่ Airflow 2.0 สามารถรัน multiple schedulers ได้ โดยใช้ database row-level locks ป้องกัน race conditions schedulers ทำงาน active-active ไม่ใช่ active-standby ทุก scheduler parse DAGs และ schedule tasks พร้อมกัน database lock ป้องกันไม่ให้ task ถูก schedule ซ้ำ ถ้า scheduler ตัวใดตัวหนึ่งล่ม ตัวที่เหลือทำงานต่อได้ทันที ไม่มี downtime แนะนำรัน 2 schedulers สำหรับ HA
Q: DAG ที่ดีควรออกแบบอย่างไร?
A: หลักการสำคัญ Idempotent ทุก task run ซ้ำได้ให้ผลเหมือนเดิม (ใช้ UPSERT ไม่ใช่ INSERT), Atomic แต่ละ task ทำงานเดียว succeed หรือ fail ทั้งหมด, Small tasks แยก tasks ให้เล็ก ง่ายต่อ retry และ debug, No hardcoded values ใช้ Variables และ Connections, Proper dependencies กำหนด dependencies ให้ถูกต้อง ใช้ task groups จัดกลุ่ม, Testing เขียน unit tests สำหรับ DAGs, Documentation เพิ่ม doc strings และ tags
Q: Airflow กับ Prefect ต่างกันอย่างไร?
A: Airflow เป็น schedule-driven (DAGs run ตาม schedule), มี web UI ครบ, community ใหญ่ที่สุด, HA support ดี, ซับซ้อนในการ setup Prefect เป็น event-driven ได้ด้วย, Pythonic API ง่ายกว่า, cloud-native, dynamic workflows ง่ายกว่า, setup ง่ายกว่า สำหรับ production ที่ต้องการ stability และ HA แนะนำ Airflow สำหรับ team เล็กที่ต้องการ simplicity แนะนำ Prefect
