Airbyte ETL High Availability HA Setup คืออะไร
Airbyte เป็น open-source data integration platform สำหรับ ETL/ELT ที่รองรับ connectors มากกว่า 350 ตัว ช่วยให้ sync data จาก sources ต่างๆ (databases, APIs, SaaS) ไปยัง destinations (data warehouses, data lakes) High Availability (HA) Setup สำหรับ Airbyte คือการตั้งค่าให้ระบบ data integration ทำงานต่อเนื่องได้แม้มี component failure ไม่พลาด sync jobs และกู้คืนได้อัตโนมัติ บทความนี้อธิบายวิธี setup Airbyte แบบ HA ครบ ตั้งแต่ architecture, Kubernetes deployment ไปจนถึง monitoring
Airbyte Architecture
# airbyte_arch.py — Airbyte architecture
import json
class AirbyteArchitecture:
COMPONENTS = {
"webapp": {
"name": "Web App (UI)",
"role": "Frontend สำหรับจัดการ connections, sync schedules, monitoring",
"ha": "Stateless — run multiple replicas behind load balancer",
},
"server": {
"name": "Server (API)",
"role": "Core API — จัดการ configurations, connections, schedules",
"ha": "Stateless — scale horizontally, state อยู่ใน database",
},
"scheduler": {
"name": "Scheduler",
"role": "จัดการ sync jobs — trigger, queue, track status",
"ha": "Active-passive — ต้อง leader election (1 active scheduler)",
},
"worker": {
"name": "Workers",
"role": "รัน sync jobs จริง — source → destination data transfer",
"ha": "Scale horizontally — เพิ่ม workers ตาม workload",
},
"temporal": {
"name": "Temporal (Workflow Engine)",
"role": "Orchestrate sync workflows — retry, timeout, state management",
"ha": "Temporal cluster: multiple nodes + Cassandra/PostgreSQL backend",
},
"database": {
"name": "PostgreSQL Database",
"role": "เก็บ configurations, connection info, sync history, state",
"ha": "PostgreSQL HA: streaming replication, auto-failover (Patroni/RDS)",
},
}
def show_components(self):
print("=== Airbyte Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" Role: {comp['role']}")
print(f" HA: {comp['ha']}")
print()
arch = AirbyteArchitecture()
arch.show_components()
Kubernetes HA Deployment
# k8s_ha.py — Kubernetes HA deployment for Airbyte
import json
class K8sHADeployment:
HELM_VALUES = """
# values-ha.yaml — Airbyte Helm chart HA configuration
global:
database:
type: external
host: airbyte-postgres-ha.database.svc
port: 5432
database: airbyte
secretName: airbyte-db-credentials
webapp:
replicaCount: 2
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
podDisruptionBudget:
enabled: true
minAvailable: 1
server:
replicaCount: 2
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2000m
memory: 2Gi
podDisruptionBudget:
enabled: true
minAvailable: 1
worker:
replicaCount: 3
resources:
requests:
cpu: 1000m
memory: 2Gi
limits:
cpu: 4000m
memory: 4Gi
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 10
targetCPUUtilizationPercentage: 70
temporal:
replicaCount: 2
resources:
requests:
cpu: 500m
memory: 1Gi
pod:
tolerations: []
nodeSelector: {}
topologySpreadConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
"""
COMMANDS = {
"install": "helm install airbyte airbyte/airbyte -f values-ha.yaml -n airbyte",
"upgrade": "helm upgrade airbyte airbyte/airbyte -f values-ha.yaml -n airbyte",
"status": "kubectl get pods -n airbyte -o wide",
}
def show_values(self):
print("=== Helm HA Values ===")
print(self.HELM_VALUES[:500])
def show_commands(self):
print("\n=== Commands ===")
for name, cmd in self.COMMANDS.items():
print(f" [{name}] {cmd}")
k8s = K8sHADeployment()
k8s.show_values()
k8s.show_commands()
Database HA
# db_ha.py — PostgreSQL HA for Airbyte
import json
class DatabaseHA:
OPTIONS = {
"rds_multi_az": {
"name": "AWS RDS Multi-AZ",
"description": "Managed PostgreSQL with automatic failover — standby ใน AZ อื่น",
"rto": "< 2 minutes (automatic failover)",
"rpo": "0 (synchronous replication)",
"cost": "$100-500/month (db.r6g.large)",
"setup": "Easy — AWS console/Terraform",
},
"cloud_sql_ha": {
"name": "GCP Cloud SQL HA",
"description": "Managed PostgreSQL with regional HA — automatic failover",
"rto": "< 2 minutes",
"rpo": "0",
"cost": "$80-400/month",
"setup": "Easy — GCP console/Terraform",
},
"patroni": {
"name": "Patroni (Self-managed)",
"description": "Open-source HA solution for PostgreSQL — leader election via etcd/ZooKeeper",
"rto": "< 30 seconds (automatic failover)",
"rpo": "0 (synchronous) or seconds (async)",
"cost": "Infrastructure only — no license",
"setup": "Complex — ต้อง manage etcd + Patroni + HAProxy",
},
}
BACKUP = {
"automated": "RDS/Cloud SQL: automated daily backups + point-in-time recovery",
"wal_archiving": "WAL archiving to S3/GCS — continuous backup",
"logical": "pg_dump scheduled backups — weekly full + daily incremental",
"retention": "แนะนำ: 30 days point-in-time, 90 days logical backups",
}
def show_options(self):
print("=== Database HA Options ===\n")
for key, opt in self.OPTIONS.items():
print(f"[{opt['name']}]")
print(f" RTO: {opt['rto']} | RPO: {opt['rpo']}")
print(f" Cost: {opt['cost']}")
print()
def show_backup(self):
print("=== Backup Strategy ===")
for name, desc in self.BACKUP.items():
print(f" [{name}] {desc}")
db = DatabaseHA()
db.show_options()
db.show_backup()
Python Monitoring
# monitoring.py — Python monitoring for Airbyte HA
import json
class AirbyteMonitoring:
CODE = """
# airbyte_monitor.py — Monitor Airbyte HA health
import requests
import json
from datetime import datetime, timedelta
class AirbyteMonitor:
def __init__(self, api_url="http://localhost:8001/api/v1"):
self.api_url = api_url
def health_check(self):
'''Check Airbyte server health'''
try:
resp = requests.get(f"{self.api_url}/health", timeout=5)
return {
'status': 'healthy' if resp.status_code == 200 else 'unhealthy',
'response_time_ms': round(resp.elapsed.total_seconds() * 1000),
}
except Exception as e:
return {'status': 'unreachable', 'error': str(e)}
def get_connections(self):
'''List all connections and their status'''
resp = requests.post(
f"{self.api_url}/connections/list",
json={'workspaceId': self._get_workspace_id()},
timeout=10,
)
connections = resp.json().get('connections', [])
return [{
'name': c['name'],
'status': c['status'],
'schedule': c.get('schedule', {}).get('timeUnit', 'manual'),
'source': c['sourceName'],
'destination': c['destinationName'],
} for c in connections]
def get_failed_jobs(self, hours=24):
'''Get failed sync jobs in last N hours'''
connections = self.get_connections()
failed = []
for conn in connections:
jobs = self._get_jobs(conn.get('connectionId'))
for job in jobs:
if job['status'] == 'failed':
created = datetime.fromtimestamp(job['createdAt'])
if created > datetime.now() - timedelta(hours=hours):
failed.append({
'connection': conn['name'],
'job_id': job['id'],
'created_at': created.isoformat(),
'bytes_synced': job.get('bytesSynced', 0),
})
return failed
def sync_metrics(self):
'''Get overall sync metrics'''
connections = self.get_connections()
active = sum(1 for c in connections if c['status'] == 'active')
inactive = sum(1 for c in connections if c['status'] != 'active')
return {
'total_connections': len(connections),
'active': active,
'inactive': inactive,
'timestamp': datetime.utcnow().isoformat(),
}
def _get_workspace_id(self):
resp = requests.post(f"{self.api_url}/workspaces/list", json={}, timeout=10)
workspaces = resp.json().get('workspaces', [])
return workspaces[0]['workspaceId'] if workspaces else None
def _get_jobs(self, connection_id):
resp = requests.post(
f"{self.api_url}/jobs/list",
json={'configId': connection_id, 'configTypes': ['sync']},
timeout=10,
)
return resp.json().get('jobs', [])
# monitor = AirbyteMonitor("http://airbyte-server:8001/api/v1")
# health = monitor.health_check()
# failed = monitor.get_failed_jobs(hours=24)
# metrics = monitor.sync_metrics()
"""
def show_code(self):
print("=== Airbyte Monitor ===")
print(self.CODE[:600])
monitoring = AirbyteMonitoring()
monitoring.show_code()
Disaster Recovery
# dr.py — Disaster recovery for Airbyte
import json
class DisasterRecovery:
SCENARIOS = {
"worker_failure": {
"name": "Worker Pod Failure",
"impact": "Running sync job fails",
"recovery": "Kubernetes restarts pod → Temporal retries failed job automatically",
"rto": "1-5 minutes",
},
"scheduler_failure": {
"name": "Scheduler Failure",
"impact": "New sync jobs not triggered",
"recovery": "Kubernetes restarts scheduler → catches up on missed schedules",
"rto": "2-5 minutes",
},
"database_failure": {
"name": "Database Failure",
"impact": "All services degraded — no config reads/writes",
"recovery": "RDS Multi-AZ automatic failover → services reconnect",
"rto": "2-5 minutes (managed DB), 30s (Patroni)",
},
"cluster_failure": {
"name": "Kubernetes Cluster Failure",
"impact": "Complete outage",
"recovery": "Deploy to DR cluster → restore DB from backup → redeploy Airbyte",
"rto": "30-60 minutes",
},
}
CHECKLIST = [
"Database HA enabled (Multi-AZ / Patroni)",
"Database backups: daily + WAL archiving",
"Airbyte config export: weekly automated backup",
"Worker autoscaling configured",
"PodDisruptionBudgets set for all components",
"Monitoring + alerting for failed jobs",
"DR runbook documented and tested quarterly",
]
def show_scenarios(self):
print("=== DR Scenarios ===\n")
for key, s in self.SCENARIOS.items():
print(f"[{s['name']}]")
print(f" Impact: {s['impact']}")
print(f" RTO: {s['rto']}")
print()
def show_checklist(self):
print("=== HA Checklist ===")
for item in self.CHECKLIST:
print(f" ☐ {item}")
dr = DisasterRecovery()
dr.show_scenarios()
dr.show_checklist()
FAQ - คำถามที่พบบ่อย
Q: Airbyte กับ Fivetran อันไหนดีกว่า?
A: Airbyte: open-source, self-hosted (ควบคุมได้เต็ม), 350+ connectors, ฟรี (self-hosted) Fivetran: managed SaaS, ง่ายกว่า setup, reliable, แต่แพง ($1-5+ per MAR) เลือก Airbyte: ถ้าต้องการ control + cost savings + custom connectors เลือก Fivetran: ถ้าต้องการ zero-maintenance + reliability + ไม่มี DevOps team Airbyte Cloud: managed version — ง่ายเหมือน Fivetran แต่ถูกกว่า
Q: Airbyte ต้องใช้ Kubernetes ไหม?
A: ไม่จำเป็น — มี deployment options หลายแบบ: Docker Compose: ง่ายสุด สำหรับ dev/small production Kubernetes: แนะนำสำหรับ production HA — auto-scaling, self-healing Airbyte Cloud: managed — ไม่ต้อง deploy เอง แนะนำ: เริ่ม Docker Compose → ย้าย K8s เมื่อต้อง HA + scale
Q: Sync job fail แล้วจะเกิดอะไร?
A: Temporal (workflow engine) จัดการ retry อัตโนมัติ: Default: retry 3 ครั้ง with exponential backoff ถ้า retry หมด: job marked as failed → alert ส่ง notification Next scheduled sync: run ปกติ — sync incremental data ที่พลาดไป Data consistency: Airbyte ใช้ state management — resume จาก last successful state ไม่มี data loss: incremental sync จะ catch up data ที่ fail ในรอบก่อน
Q: HA setup เพิ่ม cost เท่าไหร่?
A: Database HA: +50-100% (standby replica) — RDS Multi-AZ ~2x single instance Multiple workers: +$50-200/month ต่อ worker (ขึ้นกับ instance size) Kubernetes overhead: control plane + node pool — $100-300/month เพิ่มเติม รวม: HA setup อาจแพงกว่า single-node 2-3x — แต่คุ้มถ้า data freshness สำคัญ ทางเลือก: Airbyte Cloud ($) — managed HA ไม่ต้อง manage เอง
