Airbyte ETL Platform
Airbyte Open Source ELT Platform Data Pipeline 300+ Connectors BigQuery Snowflake Redshift Full Refresh Incremental Sync UI Config Docker Kubernetes
Internal Developer Platform IDP Self-service Developer สร้าง Deploy จัดการแอปด้วยตัวเอง CI/CD Infrastructure Monitoring Data Pipeline Backstage Port Humanitec
Airbyte Setup
# === Airbyte Setup ===
# 1. Docker Compose
# git clone https://github.com/airbytehq/airbyte.git
# cd airbyte
# ./run-ab-platform.sh
# 2. Kubernetes (Helm)
# helm repo add airbyte https://airbytehq.github.io/helm-charts
# helm install airbyte airbyte/airbyte \
# --set global.storage.type=s3 \
# --set global.storage.s3.bucket=airbyte-data \
# --set global.storage.s3.region=ap-southeast-1 \
# --set webapp.replicaCount=2 \
# --set server.replicaCount=2 \
# --set worker.replicaCount=3
# 3. Airbyte API — สร้าง Connection
# curl -X POST http://localhost:8006/v1/connections \
# -H "Content-Type: application/json" \
# -d '{
# "sourceId": "source-postgres-id",
# "destinationId": "dest-bigquery-id",
# "schedule": {"scheduleType": "cron", "cronExpression": "0 */6 * * *"},
# "syncCatalog": {
# "streams": [{
# "stream": {"name": "users", "namespace": "public"},
# "config": {
# "syncMode": "incremental",
# "destinationSyncMode": "append_dedup",
# "cursorField": ["updated_at"],
# "primaryKey": [["id"]]
# }
# }]
# },
# "status": "active"
# }'
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class AirbyteConnector:
name: str
connector_type: str # source, destination
category: str
sync_modes: List[str]
class AirbytePlatform:
"""Airbyte Data Platform"""
def __init__(self):
self.sources: List[AirbyteConnector] = []
self.destinations: List[AirbyteConnector] = []
self.connections: List[dict] = []
def add_source(self, connector: AirbyteConnector):
self.sources.append(connector)
def add_destination(self, connector: AirbyteConnector):
self.destinations.append(connector)
def create_connection(self, source: str, destination: str,
schedule: str, sync_mode: str):
conn = {
"source": source,
"destination": destination,
"schedule": schedule,
"sync_mode": sync_mode,
"status": "active",
}
self.connections.append(conn)
return conn
def show(self):
print(f"\n{'='*55}")
print(f"Airbyte Platform")
print(f"{'='*55}")
print(f"\n Sources ({len(self.sources)}):")
for s in self.sources:
print(f" [{s.category}] {s.name}: {', '.join(s.sync_modes)}")
print(f"\n Destinations ({len(self.destinations)}):")
for d in self.destinations:
print(f" [{d.category}] {d.name}")
print(f"\n Connections ({len(self.connections)}):")
for c in self.connections:
print(f" {c['source']} -> {c['destination']} ({c['schedule']})")
platform = AirbytePlatform()
sources = [
AirbyteConnector("PostgreSQL", "source", "Database", ["full_refresh", "incremental"]),
AirbyteConnector("MySQL", "source", "Database", ["full_refresh", "incremental"]),
AirbyteConnector("MongoDB", "source", "Database", ["full_refresh", "incremental"]),
AirbyteConnector("Stripe", "source", "Payment", ["full_refresh", "incremental"]),
AirbyteConnector("Salesforce", "source", "CRM", ["full_refresh", "incremental"]),
AirbyteConnector("Google Analytics", "source", "Analytics", ["full_refresh"]),
AirbyteConnector("S3/CSV", "source", "File", ["full_refresh"]),
AirbyteConnector("Kafka", "source", "Streaming", ["incremental"]),
]
destinations = [
AirbyteConnector("BigQuery", "destination", "Data Warehouse", []),
AirbyteConnector("Snowflake", "destination", "Data Warehouse", []),
AirbyteConnector("Redshift", "destination", "Data Warehouse", []),
AirbyteConnector("PostgreSQL", "destination", "Database", []),
AirbyteConnector("S3", "destination", "Data Lake", []),
]
for s in sources:
platform.add_source(s)
for d in destinations:
platform.add_destination(d)
platform.create_connection("PostgreSQL", "BigQuery", "ทุก 6 ชั่วโมง", "incremental")
platform.create_connection("Stripe", "Snowflake", "ทุกวัน 02:00", "incremental")
platform.create_connection("Google Analytics", "BigQuery", "ทุกวัน 06:00", "full_refresh")
platform.show()
Internal Developer Platform
# idp_platform.py — Internal Developer Platform with Airbyte
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class ServiceTemplate:
name: str
category: str
components: List[str]
self_service: bool
class InternalDeveloperPlatform:
"""IDP with Data Pipeline Self-service"""
def __init__(self):
self.templates: List[ServiceTemplate] = []
self.services: Dict[str, dict] = {}
def add_template(self, template: ServiceTemplate):
self.templates.append(template)
def provision_service(self, template_name: str, team: str, config: dict) -> dict:
"""Self-service Provision"""
template = next((t for t in self.templates if t.name == template_name), None)
if not template:
return {"error": "Template not found"}
service = {
"template": template_name,
"team": team,
"config": config,
"status": "provisioned",
"components": template.components,
}
key = f"{team}-{template_name}"
self.services[key] = service
print(f" PROVISIONED: {key}")
print(f" Components: {', '.join(template.components)}")
return service
def show_catalog(self):
print(f"\n{'='*55}")
print(f"IDP Service Catalog")
print(f"{'='*55}")
for t in self.templates:
ss = "Self-service" if t.self_service else "Request"
print(f"\n [{t.category}] {t.name} ({ss})")
print(f" Components: {', '.join(t.components)}")
idp = InternalDeveloperPlatform()
templates = [
ServiceTemplate("Data Pipeline (Airbyte)", "Data",
["Airbyte Connection", "dbt Transform", "Airflow Schedule", "Grafana Dashboard"],
True),
ServiceTemplate("Web Application", "App",
["Kubernetes Deployment", "Ingress", "PostgreSQL", "Redis", "CI/CD"],
True),
ServiceTemplate("ML Pipeline", "AI/ML",
["Kubeflow Pipeline", "Feature Store", "Model Registry", "Monitoring"],
True),
ServiceTemplate("API Service", "Backend",
["Kubernetes Deployment", "API Gateway", "Auth (OIDC)", "Monitoring"],
True),
]
for t in templates:
idp.add_template(t)
idp.show_catalog()
# Provision Data Pipeline
idp.provision_service("Data Pipeline (Airbyte)", "analytics-team", {
"source": "PostgreSQL (production)",
"destination": "BigQuery",
"schedule": "ทุก 6 ชั่วโมง",
"sync_mode": "incremental",
"transforms": ["dbt_staging", "dbt_marts"],
})
# IDP Tools
idp_tools = {
"Backstage": {"by": "Spotify", "type": "Developer Portal", "plugins": "200+"},
"Port": {"by": "Port.io", "type": "IDP Platform", "plugins": "Built-in"},
"Humanitec": {"by": "Humanitec", "type": "Platform Orchestrator", "plugins": "Drivers"},
"Kratix": {"by": "Syntasso", "type": "Framework", "plugins": "Custom Promises"},
}
print(f"\n\nIDP Tools:")
for tool, info in idp_tools.items():
print(f" {tool} (by {info['by']}): {info['type']} | Plugins: {info['plugins']}")
Monitoring Pipeline
# pipeline_monitoring.py — Data Pipeline Monitoring
monitoring = {
"Sync Status": {
"metric": "airbyte_sync_status",
"alert": "Sync Failed -> Slack + PagerDuty",
"dashboard": "Grafana Airbyte Dashboard",
},
"Sync Duration": {
"metric": "airbyte_sync_duration_seconds",
"alert": "Duration > 2x Average -> Warning",
"dashboard": "Duration Trend Chart",
},
"Records Synced": {
"metric": "airbyte_records_emitted_total",
"alert": "0 Records -> Check Source",
"dashboard": "Records per Sync Chart",
},
"Data Freshness": {
"metric": "time_since_last_successful_sync",
"alert": "> SLA Threshold -> Critical",
"dashboard": "Freshness Heatmap",
},
"Error Rate": {
"metric": "airbyte_sync_errors_total / airbyte_sync_total",
"alert": "> 5% -> Investigate",
"dashboard": "Error Rate Trend",
},
}
print("Data Pipeline Monitoring:")
for metric_name, info in monitoring.items():
print(f"\n [{metric_name}]")
for key, value in info.items():
print(f" {key}: {value}")
# Airbyte vs Others
comparison = {
"Airbyte": {"type": "Open Source ELT", "connectors": "300+", "pricing": "Free / Cloud Paid", "hosting": "Self-hosted / Cloud"},
"Fivetran": {"type": "Managed ELT", "connectors": "500+", "pricing": "Volume-based (แพง)", "hosting": "Cloud Only"},
"Stitch": {"type": "Managed ETL", "connectors": "200+", "pricing": "Row-based", "hosting": "Cloud Only"},
"Meltano": {"type": "Open Source ELT", "connectors": "600+ (Singer)", "pricing": "Free", "hosting": "Self-hosted"},
}
print(f"\n\nELT Platform Comparison:")
for tool, info in comparison.items():
print(f" {tool}: {info['type']} | Connectors: {info['connectors']} | {info['pricing']}")
Best Practices
- Incremental Sync: ใช้ Incremental แทน Full Refresh ประหยัด Resources
- Self-service: สร้าง Templates ให้ Developer สร้าง Pipeline เอง
- dbt: ใช้ dbt Transform ข้อมูลหลัง Load ใน Data Warehouse
- Monitoring: ติดตาม Sync Status, Duration, Records, Freshness
- Backstage: ใช้ Backstage Plugin แสดง Pipeline Status ใน IDP Portal
- Schema Changes: ตั้ง Alert เมื่อ Source Schema เปลี่ยน
Airbyte คืออะไร
Open Source ELT Platform Data Pipeline 300+ Connectors BigQuery Snowflake Redshift Full Refresh Incremental Sync UI Config Docker Kubernetes
Internal Developer Platform คืออะไร
IDP แพลตฟอร์มภายในองค์กร Developer สร้าง Deploy จัดการแอปด้วยตัวเอง Self-service CI/CD Infrastructure Monitoring Backstage Port Humanitec
Airbyte ใช้กับ IDP อย่างไร
Self-service Data Pipeline Developer เลือก Source Destination IDP Portal Airbyte API สร้าง Connections อัตโนมัติ Template Backstage Plugin Pipeline Status
Airbyte ต่างจาก Fivetran อย่างไร
Airbyte Open Source ฟรี Self-hosted ควบคุมข้อมูล 300+ Connectors Fivetran Managed SaaS ง่ายกว่า 500+ แพงตาม Volume Airbyte เหมาะ Self-hosted ควบคุมค่าใช้จ่าย
สรุป
Airbyte Open Source ELT 300+ Connectors Incremental Sync Internal Developer Platform IDP Self-service Backstage dbt Transform Monitoring Sync Status Duration Records Freshness Schema Changes