ai

Airflow DAG Design High Availability HA Setup —

Airflow DAG Design High Availability HA Setup —

Apache Airflow คืออะไร

Airflow DAG Design High Availability HA Setup —

Apache Airflow เป็น open source workflow orchestration platform สำหรับ programmatically author, schedule และ monitor workflows ใช้ Python เขียน DAGs (Directed Acyclic Graphs) กำหนด task dependencies และ execution order พัฒนาโดย Airbnb และเป็น Apache top-level project ตั้งแต่ปี 2019

Components หลักของ Airflow ได้แก่ Web Server ให้ UI สำหรับ monitor DAGs, Scheduler ตัดสินใจว่า task ไหนต้อง run เมื่อไหร่, Executor ทำหน้าที่ run tasks จริง (Local, Celery, Kubernetes), Metadata Database เก็บ state ของทุก DAGs และ tasks (PostgreSQL/MySQL), Workers process ที่ run tasks จริง (สำหรับ CeleryExecutor/KubernetesExecutor)

อ่านเพิ่ม: Docker Compose ตัวอย่าง Config สำหรับ Self-hosted Apps · ดูรายละเอียด Docker Compose ตัวอย่าง Config สำหรับ Self-hosted Apps · อ่านเพิ่ม: AWS Iam คืออะไร — คู่มือ IT Infrastructure 2026 — คู่มือฉบับ

High Availability (HA) สำหรับ Airflow สำคัญมากสำหรับ production เพราะถ้า Airflow ล่ม workflows ทั้งหมดจะหยุดทำงาน ส่งผลกระทบต่อ data pipelines, ETL jobs, ML training และ business processes HA setup ต้อง redundancy ทุก component ไม่มี single point of failure

ติดตั้ง Airflow แบบ High Availability

Setup Airflow HA บน Kubernetes

# === Airflow HA Installation ===



# 1. Prerequisites

# ===================================

# - Kubernetes cluster (3+ nodes)

# - Helm 3

# - PostgreSQL (managed, e.g., Cloud SQL / RDS)

# - Redis (for CeleryExecutor)



# 2. Install Airflow via Helm

helm repo add apache-airflow https://airflow.apache.org

helm repo update



# 3. Create values.yaml for HA configuration

cat > airflow-values.yaml << 'EOF'

# Airflow HA Configuration

executor: CeleryExecutor



# Web Server — Multiple replicas with LoadBalancer

webserver:

  replicas: 2

  resources:

    requests:

      cpu: "500m"

      memory: "1Gi"

    limits:

      cpu: "2"

      memory: "4Gi"

  service:

    type: LoadBalancer

  livenessProbe:

    initialDelaySeconds: 15

    periodSeconds: 10

  readinessProbe:

    initialDelaySeconds: 15

    periodSeconds: 10



# Scheduler — HA Scheduler (Airflow 2.0+)

scheduler:

  replicas: 2  # HA scheduler

  resources:

    requests:

      cpu: "1"

      memory: "2Gi"

    limits:

      cpu: "4"

      memory: "8Gi"



# Workers — Auto-scaling

workers:

  replicas: 3

  resources:

    requests:

      cpu: "1"

      memory: "2Gi"

    limits:

      cpu: "4"

      memory: "8Gi"

  keda:

    enabled: true

    minReplicaCount: 2

    maxReplicaCount: 10



# Triggerer for deferrable operators

triggerer:

  replicas: 2

  resources:

    requests:

      cpu: "500m"

      memory: "512Mi"



# External PostgreSQL (HA)

postgresql:

  enabled: false



data:

  metadataConnection:

    user: airflow

    pass: secretpassword

    protocol: postgresql

    host: airflow-db.xxxx.rds.amazonaws.com

    port: 5432

    db: airflow

    sslmode: require



# External Redis (HA)

redis:

  enabled: false



data:

  brokerUrl: redis://:password@redis-cluster.xxxx.cache.amazonaws.com:6379/0



# DAGs sync via Git

dags:

  gitSync:

    enabled: true

    repo: https://github.com/my-org/airflow-dags.git

    branch: main

    rev: HEAD

    depth: 1

    maxFailures: 3

    subPath: "dags"

    period: 60s



# Logging to S3

config:

  AIRFLOW__LOGGING__REMOTE_LOGGING: "True"

  AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://airflow-logs/logs"

  AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "aws_default"

  AIRFLOW__CORE__PARALLELISM: "64"

  AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: "32"

  AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "30"

EOF



# 4. Deploy

helm install airflow apache-airflow/airflow \

    --namespace airflow \

    --create-namespace \

    -f airflow-values.yaml



# 5. Verify HA

kubectl get pods -n airflow

# Should see: 2 webservers, 2 schedulers, 3+ workers, 2 triggerers



echo "Airflow HA deployed"

DAG Design Best Practices

เขียน DAGs ที่ดีสำหรับ production

!/usr/bin/env python3

dags/etl_pipeline.py — Production DAG Example

"""

ETL Pipeline DAG with best practices:

  • Idempotent tasks
  • Proper error handling
  • SLA monitoring
  • Task groups for organization
  • Dynamic task generation

"""

from datetime import datetime, timedelta

from airflow import DAG

from airflow.decorators import dag, task, task_group

from airflow.operators.python import PythonOperator

from airflow.operators.empty import EmptyOperator

from airflow.providers.postgres.operators.postgres import PostgresOperator

from airflow.utils.trigger_rule import TriggerRule

from airflow.models import Variable

Best Practice 1: Default args with retry and SLA

default_args = {

"owner": "data-team",

"depends_on_past": False,

"email": ["alerts@example.com"],

"email_on_failure": True,

"email_on_retry": False,

"retries": 3,

"retry_delay": timedelta(minutes=5),

"retry_exponential_backoff": True,

"max_retry_delay": timedelta(minutes=60),

"execution_timeout": timedelta(hours=2),

"sla": timedelta(hours=4),

}

Best Practice 2: Use @dag decorator

เนื้อหาเกี่ยวข้อง — WordPress Headless Feature Flag Management

@dag(

dag_id="etl_pipeline_v2",

default_args=default_args,

description="Production ETL Pipeline",

schedule="0 2 * * *", # Daily at 2 AM

start_date=datetime(2025, 1, 1),

catchup=False,

max_active_runs=1,

tags=["etl", "production", "data-team"],

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

)

def etl_pipeline():

# Best Practice 3: Task Groups

@task_group(group_id="extract")

def extract_data():

@task(task_id="extract_users")

def extract_users(**context):

ds = context["ds"]

# Extract users for date partition

return {"rows": 50000, "date": ds}

@task(task_id="extract_orders")

def extract_orders(**context):

ds = context["ds"]

return {"rows": 120000, "date": ds}

@task(task_id="extract_products")

def extract_products(**context):

return {"rows": 5000}

return extract_users(), extract_orders(), extract_products()

@task_group(group_id="transform")

def transform_data(extracted):

@task(task_id="transform_and_join")

def transform(data):

# Idempotent: overwrite partition, not append

return {"transformed_rows": 100000}

return transform(extracted)

@task_group(group_id="load")

def load_data(transformed):

Airflow DAG Design High Availability HA Setup —

@task(task_id="load_to_warehouse")

def load(data):

return {"loaded": True}

return load(transformed)

@task(task_id="quality_check")

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Go Wire DI Audit Trail Logging — คู่มือฉบับสมบูรณ์ 2026

def quality_check(**context):

# Data quality validation

checks = {

"row_count_valid": True,

"no_nulls_in_pk": True,

"referential_integrity": True,

}

failed = [k for k, v in checks.items() if not v]

if failed:

raise ValueError(f"Quality checks failed: {failed}")

return checks

# Best Practice 4: Proper task dependencies

start = EmptyOperator(task_id="start")

end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.ALL_SUCCESS)

extracted = extract_data()

transformed = transform_data(extracted)

loaded = load_data(transformed)

qc = quality_check()

start >> extracted >> transformed >> loaded >> qc >> end

แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex

Best Practice 5: Instantiate DAG

etl_dag = etl_pipeline()

print("DAG design best practices defined")

HA Architecture Patterns

Architecture patterns สำหรับ Airflow HA

# === HA Architecture Patterns ===



# 1. Multi-AZ Deployment

# ===================================

# Region: ap-southeast-1

#

# AZ-a:

#   - Webserver Pod (1)

#   - Scheduler Pod (1)

#   - Worker Pods (2)

#   - Triggerer Pod (1)

#

# AZ-b:

#   - Webserver Pod (1)

#   - Scheduler Pod (1)

#   - Worker Pods (2)

#   - Triggerer Pod (1)

#

# AZ-c:

#   - Worker Pods (2) — overflow

#

# Shared Services (Managed):

#   - RDS PostgreSQL (Multi-AZ)

#   - ElastiCache Redis (Multi-AZ)

#   - S3 for logs and DAGs

#   - ALB for web traffic



# 2. Kubernetes Pod Anti-Affinity

# ===================================

cat > pod-anti-affinity.yaml << 'EOF'

# Ensure scheduler pods run on different nodes

scheduler:

  affinity:

    podAntiAffinity:

      requiredDuringSchedulingIgnoredDuringExecution:

        - labelSelector:

            matchExpressions:

              - key: component

                operator: In

                values:

                  - scheduler

          topologyKey: kubernetes.io/hostname



# Spread workers across AZs

workers:

  affinity:

    podAntiAffinity:

      preferredDuringSchedulingIgnoredDuringExecution:

        - weight: 100

          podAffinityTerm:

            labelSelector:

              matchExpressions:

                - key: component

                  operator: In

                  values:

                    - worker

            topologyKey: topology.kubernetes.io/zone

EOF



# 3. Database HA

# ===================================

# PostgreSQL HA options:

# - AWS RDS Multi-AZ (automatic failover)

# - Cloud SQL HA (automatic failover)

# - Self-managed: Patroni + PgBouncer

#

# Redis HA options:

# - AWS ElastiCache (cluster mode)

# - Redis Sentinel (self-managed)

# - Redis Cluster (self-managed)



# 4. Health Checks

# ===================================

cat > healthcheck.py << 'PYEOF'

#!/usr/bin/env python3

"""Airflow HA Health Check Script"""

import json

import urllib.request

import sys



def check_airflow_health(base_url="http://localhost:8080"):

    try:

        req = urllib.request.Request(f"{base_url}/health")

        with urllib.request.urlopen(req, timeout=10) as response:

            data = json.loads(response.read())

            

            scheduler_ok = data.get("scheduler", {}).get("status") == "healthy"

            metadb_ok = data.get("metadatabase", {}).get("status") == "healthy"

            triggerer_ok = data.get("triggerer", {}).get("status") == "healthy"

            

            result = {

                "scheduler": "OK" if scheduler_ok else "FAIL",

                "database": "OK" if metadb_ok else "FAIL",

                "triggerer": "OK" if triggerer_ok else "FAIL",

                "overall": "healthy" if all([scheduler_ok, metadb_ok]) else "unhealthy",

            }

            

            print(json.dumps(result, indent=2))

            return 0 if result["overall"] == "healthy" else 1

    

    except Exception as e:

        print(json.dumps({"error": str(e), "overall": "unreachable"}))

        return 1



sys.exit(check_airflow_health())

PYEOF



chmod +x healthcheck.py



echo "HA architecture configured"

Monitoring และ Alerting

Monitor Airflow HA cluster

#!/usr/bin/env python3

# airflow_monitor.py — Airflow Monitoring

import json

import logging

from datetime import datetime, timedelta

from typing import Dict, List



logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("monitor")



class AirflowMonitor:

    def __init__(self):

        self.metrics = {}

    

    def collect_metrics(self):

        return {

            "timestamp": datetime.utcnow().isoformat(),

            "scheduler": {

                "heartbeat_age_sec": 5,

                "dag_bag_size": 45,

                "dag_processing_time_sec": 12.5,

                "orphaned_tasks": 0,

                "running_tasks": 15,

                "queued_tasks": 3,

            },

            "workers": {

                "active": 4,

                "total": 6,

                "task_execution_rate": 45.2,

                "avg_task_duration_sec": 180,

            },

            "database": {

                "connection_count": 35,

                "max_connections": 100,

                "query_latency_ms": 8,

                "pool_utilization_pct": 35,

            },

            "dag_stats": {

                "total_dags": 45,

                "active_dags": 38,

                "paused_dags": 7,

                "failed_dag_runs_24h": 2,

                "success_rate_24h_pct": 96.5,

            },

        }

    

    def check_alerts(self, metrics):

        alerts = []

        

        sched = metrics["scheduler"]

        if sched["heartbeat_age_sec"] > 30:

            alerts.append({"severity": "critical", "msg": "Scheduler heartbeat stale"})

        if sched["orphaned_tasks"] > 0:

            alerts.append({"severity": "high", "msg": f"{sched['orphaned_tasks']} orphaned tasks"})

        if sched["queued_tasks"] > 50:

            alerts.append({"severity": "medium", "msg": "Task queue backlog"})

        

        db = metrics["database"]

        if db["pool_utilization_pct"] > 80:

            alerts.append({"severity": "high", "msg": "DB pool near capacity"})

        

        dags = metrics["dag_stats"]

        if dags["success_rate_24h_pct"] < 90:

            alerts.append({"severity": "high", "msg": f"DAG success rate low: {dags['success_rate_24h_pct']}%"})

        

        return {

            "alert_count": len(alerts),

            "status": "critical" if any(a["severity"] == "critical" for a in alerts)

                      else "warning" if alerts else "healthy",

            "alerts": alerts,

        }

    

    def capacity_report(self, metrics):

        workers = metrics["workers"]

        sched = metrics["scheduler"]

        

        tasks_per_worker = sched["running_tasks"] / max(workers["active"], 1)

        headroom_pct = (1 - workers["active"] / max(workers["total"], 1)) * 100

        

        return {

            "current_load": {

                "running_tasks": sched["running_tasks"],

                "queued_tasks": sched["queued_tasks"],

                "active_workers": workers["active"],

                "tasks_per_worker": round(tasks_per_worker, 1),

            },

            "capacity": {

                "total_workers": workers["total"],

                "headroom_pct": round(headroom_pct, 1),

                "estimated_max_concurrent_tasks": workers["total"] * 8,

            },

            "recommendation": "scale_up" if headroom_pct < 20 else "optimal" if headroom_pct < 60 else "scale_down",

        }



monitor = AirflowMonitor()

metrics = monitor.collect_metrics()

alerts = monitor.check_alerts(metrics)

capacity = monitor.capacity_report(metrics)



print("Metrics:", json.dumps(metrics["scheduler"], indent=2))

print("Alerts:", json.dumps(alerts, indent=2))

print("Capacity:", json.dumps(capacity, indent=2))

Troubleshooting และ Disaster Recovery

แก้ปัญหาและ DR สำหรับ Airflow

=== Troubleshooting & DR ===

1. Common Issues and Fixes

Issue: Scheduler not picking up new DAGs

Fix:

kubectl rollout restart deployment/airflow-scheduler -n airflow

Check: AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL setting

Issue: Tasks stuck in "queued" state

Fix:

airflow tasks clear <dag_id> -t <task_id> -s <start_date> -e <end_date>

Check Redis connection and worker status

Issue: Database connection pool exhausted

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง mTLS Service Mesh Citizen Developer

Fix:

Increase AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE

Add PgBouncer as connection pooler

Check for long-running transactions

Issue: Worker OOMKilled

Fix:

Increase worker memory limits

Optimize task memory usage

Use KubernetesPodOperator for heavy tasks

2. Database Backup and Restore

Backup (daily cron)

pg_dump -h airflow-db.rds.amazonaws.com \

-U airflow -d airflow \

--format=custom \

-f /backups/airflow_$(date +%Y%m%d).dump

Restore

pg_restore -h airflow-db.rds.amazonaws.com \

-U airflow -d airflow \

--clean --if-exists \

/backups/airflow_20250115.dump

3. DR Runbook

cat > dr_runbook.md << 'EOF'

Airflow DR Runbook

# Scenario 1: Single Worker Failure

  • Impact: Minimal, tasks reassigned to other workers
  • Action: Worker auto-recovers via Kubernetes
  • RTO: < 5 minutes (automatic)

# Scenario 2: Scheduler Failure

  • Impact: No new tasks scheduled
  • Action: HA scheduler takes over automatically
  • RTO: < 30 seconds (automatic failover)

# Scenario 3: Database Failure

  • Impact: All Airflow operations stop
  • Action: RDS Multi-AZ automatic failover
  • RTO: < 5 minutes
  • Verify: Check /health endpoint after failover

# Scenario 4: Full Cluster Failure

  • Impact: Complete outage
  • Action:

1. Switch DNS to DR region

2. Restore DB from latest backup

3. Deploy Airflow in DR cluster

4. Sync DAGs from Git

5. Resume paused DAGs

  • RTO: 30-60 minutes
  • RPO: Last DB backup (hourly)

EOF

4. Automated Recovery Script

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Prometheus Alertmanager Remote Work Setup — คู่มือฉบับสมบูรณ์ 2026

cat > recover.sh << 'SHEOF'

!/bin/bash

set -e

echo "Checking Airflow health..."

HEALTH=$(curl -s http://localhost:8080/health)

SCHEDULER=$(echo $HEALTH | python3 -c "import sys,json; print(json.load(sys.stdin)['scheduler']['status'])")

if [ "$SCHEDULER" != "healthy" ]; then

echo "Scheduler unhealthy, restarting..."

kubectl rollout restart deployment/airflow-scheduler -n airflow

sleep 30

Re-check

HEALTH=$(curl -s http://localhost:8080/health)

SCHEDULER=$(echo $HEALTH | python3 -c "import sys,json; print(json.load(sys.stdin)['scheduler']['status'])")

if [ "$SCHEDULER" != "healthy" ]; then

echo "CRITICAL: Scheduler still unhealthy after restart"

Send alert

exit 1

fi

fi

echo "Airflow health: OK"

SHEOF

chmod +x recover.sh

echo "Troubleshooting and DR configured"

FAQ คำถามที่พบบ่อย

Q: CeleryExecutor กับ KubernetesExecutor เลือกใช้อย่างไร?

A: CeleryExecutor เหมาะเมื่อต้องการ task startup เร็ว (< 1 วินาที), workload สม่ำเสมอ, ต้องการ worker pool พร้อมใช้ตลอด ข้อเสีย resources ถูกจองแม้ไม่มี tasks KubernetesExecutor เหมาะเมื่อ tasks ต้องการ resources แตกต่างกันมาก, ต้องการ isolation ระหว่าง tasks, ต้องการ scale to zero เมื่อไม่มีงาน ข้อเสีย task startup ช้ากว่า (10-30 วินาที) สำหรับ HA แนะนำ CeleryKubernetesExecutor ที่รวมข้อดีทั้งสอง tasks ทั่วไปใช้ Celery tasks หนักใช้ KubernetesPodOperator

Q: Airflow 2.x HA Scheduler ทำงานอย่างไร?

A: ตั้งแต่ Airflow 2.0 สามารถรัน multiple schedulers ได้ โดยใช้ database row-level locks ป้องกัน race conditions schedulers ทำงาน active-active ไม่ใช่ active-standby ทุก scheduler parse DAGs และ schedule tasks พร้อมกัน database lock ป้องกันไม่ให้ task ถูก schedule ซ้ำ ถ้า scheduler ตัวใดตัวหนึ่งล่ม ตัวที่เหลือทำงานต่อได้ทันที ไม่มี downtime แนะนำรัน 2 schedulers สำหรับ HA

Q: DAG ที่ดีควรออกแบบอย่างไร?

A: หลักการสำคัญ Idempotent ทุก task run ซ้ำได้ให้ผลเหมือนเดิม (ใช้ UPSERT ไม่ใช่ INSERT), Atomic แต่ละ task ทำงานเดียว succeed หรือ fail ทั้งหมด, Small tasks แยก tasks ให้เล็ก ง่ายต่อ retry และ debug, No hardcoded values ใช้ Variables และ Connections, Proper dependencies กำหนด dependencies ให้ถูกต้อง ใช้ task groups จัดกลุ่ม, Testing เขียน unit tests สำหรับ DAGs, Documentation เพิ่ม doc strings และ tags

Q: Airflow กับ Prefect ต่างกันอย่างไร?

A: Airflow เป็น schedule-driven (DAGs run ตาม schedule), มี web UI ครบ, community ใหญ่ที่สุด, HA support ดี, ซับซ้อนในการ setup Prefect เป็น event-driven ได้ด้วย, Pythonic API ง่ายกว่า, cloud-native, dynamic workflows ง่ายกว่า, setup ง่ายกว่า สำหรับ production ที่ต้องการ stability และ HA แนะนำ Airflow สำหรับ team เล็กที่ต้องการ simplicity แนะนำ Prefect

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง