Technology

Airbyte ETL Internal Developer Platform

Airbyte ETL Internal Developer Platform | SiamCafe Blog
2026-05-04· อ. บอม — SiamCafe.net· 8,101 คำ

Airbyte ETL Platform

Airbyte Open Source ELT Platform Data Pipeline 300+ Connectors BigQuery Snowflake Redshift Full Refresh Incremental Sync UI Config Docker Kubernetes

Internal Developer Platform IDP Self-service Developer สร้าง Deploy จัดการแอปด้วยตัวเอง CI/CD Infrastructure Monitoring Data Pipeline Backstage Port Humanitec

Airbyte Setup

# === Airbyte Setup ===

# 1. Docker Compose
# git clone https://github.com/airbytehq/airbyte.git
# cd airbyte
# ./run-ab-platform.sh

# 2. Kubernetes (Helm)
# helm repo add airbyte https://airbytehq.github.io/helm-charts
# helm install airbyte airbyte/airbyte \
#   --set global.storage.type=s3 \
#   --set global.storage.s3.bucket=airbyte-data \
#   --set global.storage.s3.region=ap-southeast-1 \
#   --set webapp.replicaCount=2 \
#   --set server.replicaCount=2 \
#   --set worker.replicaCount=3

# 3. Airbyte API — สร้าง Connection
# curl -X POST http://localhost:8006/v1/connections \
#   -H "Content-Type: application/json" \
#   -d '{
#     "sourceId": "source-postgres-id",
#     "destinationId": "dest-bigquery-id",
#     "schedule": {"scheduleType": "cron", "cronExpression": "0 */6 * * *"},
#     "syncCatalog": {
#       "streams": [{
#         "stream": {"name": "users", "namespace": "public"},
#         "config": {
#           "syncMode": "incremental",
#           "destinationSyncMode": "append_dedup",
#           "cursorField": ["updated_at"],
#           "primaryKey": [["id"]]
#         }
#       }]
#     },
#     "status": "active"
#   }'

from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class AirbyteConnector:
    name: str
    connector_type: str  # source, destination
    category: str
    sync_modes: List[str]

class AirbytePlatform:
    """Airbyte Data Platform"""

    def __init__(self):
        self.sources: List[AirbyteConnector] = []
        self.destinations: List[AirbyteConnector] = []
        self.connections: List[dict] = []

    def add_source(self, connector: AirbyteConnector):
        self.sources.append(connector)

    def add_destination(self, connector: AirbyteConnector):
        self.destinations.append(connector)

    def create_connection(self, source: str, destination: str,
                          schedule: str, sync_mode: str):
        conn = {
            "source": source,
            "destination": destination,
            "schedule": schedule,
            "sync_mode": sync_mode,
            "status": "active",
        }
        self.connections.append(conn)
        return conn

    def show(self):
        print(f"\n{'='*55}")
        print(f"Airbyte Platform")
        print(f"{'='*55}")

        print(f"\n  Sources ({len(self.sources)}):")
        for s in self.sources:
            print(f"    [{s.category}] {s.name}: {', '.join(s.sync_modes)}")

        print(f"\n  Destinations ({len(self.destinations)}):")
        for d in self.destinations:
            print(f"    [{d.category}] {d.name}")

        print(f"\n  Connections ({len(self.connections)}):")
        for c in self.connections:
            print(f"    {c['source']} -> {c['destination']} ({c['schedule']})")

platform = AirbytePlatform()

sources = [
    AirbyteConnector("PostgreSQL", "source", "Database", ["full_refresh", "incremental"]),
    AirbyteConnector("MySQL", "source", "Database", ["full_refresh", "incremental"]),
    AirbyteConnector("MongoDB", "source", "Database", ["full_refresh", "incremental"]),
    AirbyteConnector("Stripe", "source", "Payment", ["full_refresh", "incremental"]),
    AirbyteConnector("Salesforce", "source", "CRM", ["full_refresh", "incremental"]),
    AirbyteConnector("Google Analytics", "source", "Analytics", ["full_refresh"]),
    AirbyteConnector("S3/CSV", "source", "File", ["full_refresh"]),
    AirbyteConnector("Kafka", "source", "Streaming", ["incremental"]),
]

destinations = [
    AirbyteConnector("BigQuery", "destination", "Data Warehouse", []),
    AirbyteConnector("Snowflake", "destination", "Data Warehouse", []),
    AirbyteConnector("Redshift", "destination", "Data Warehouse", []),
    AirbyteConnector("PostgreSQL", "destination", "Database", []),
    AirbyteConnector("S3", "destination", "Data Lake", []),
]

for s in sources:
    platform.add_source(s)
for d in destinations:
    platform.add_destination(d)

platform.create_connection("PostgreSQL", "BigQuery", "ทุก 6 ชั่วโมง", "incremental")
platform.create_connection("Stripe", "Snowflake", "ทุกวัน 02:00", "incremental")
platform.create_connection("Google Analytics", "BigQuery", "ทุกวัน 06:00", "full_refresh")

platform.show()

Internal Developer Platform

# idp_platform.py — Internal Developer Platform with Airbyte
from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class ServiceTemplate:
    name: str
    category: str
    components: List[str]
    self_service: bool

class InternalDeveloperPlatform:
    """IDP with Data Pipeline Self-service"""

    def __init__(self):
        self.templates: List[ServiceTemplate] = []
        self.services: Dict[str, dict] = {}

    def add_template(self, template: ServiceTemplate):
        self.templates.append(template)

    def provision_service(self, template_name: str, team: str, config: dict) -> dict:
        """Self-service Provision"""
        template = next((t for t in self.templates if t.name == template_name), None)
        if not template:
            return {"error": "Template not found"}

        service = {
            "template": template_name,
            "team": team,
            "config": config,
            "status": "provisioned",
            "components": template.components,
        }
        key = f"{team}-{template_name}"
        self.services[key] = service
        print(f"  PROVISIONED: {key}")
        print(f"    Components: {', '.join(template.components)}")
        return service

    def show_catalog(self):
        print(f"\n{'='*55}")
        print(f"IDP Service Catalog")
        print(f"{'='*55}")
        for t in self.templates:
            ss = "Self-service" if t.self_service else "Request"
            print(f"\n  [{t.category}] {t.name} ({ss})")
            print(f"    Components: {', '.join(t.components)}")

idp = InternalDeveloperPlatform()

templates = [
    ServiceTemplate("Data Pipeline (Airbyte)", "Data",
                     ["Airbyte Connection", "dbt Transform", "Airflow Schedule", "Grafana Dashboard"],
                     True),
    ServiceTemplate("Web Application", "App",
                     ["Kubernetes Deployment", "Ingress", "PostgreSQL", "Redis", "CI/CD"],
                     True),
    ServiceTemplate("ML Pipeline", "AI/ML",
                     ["Kubeflow Pipeline", "Feature Store", "Model Registry", "Monitoring"],
                     True),
    ServiceTemplate("API Service", "Backend",
                     ["Kubernetes Deployment", "API Gateway", "Auth (OIDC)", "Monitoring"],
                     True),
]

for t in templates:
    idp.add_template(t)

idp.show_catalog()

# Provision Data Pipeline
idp.provision_service("Data Pipeline (Airbyte)", "analytics-team", {
    "source": "PostgreSQL (production)",
    "destination": "BigQuery",
    "schedule": "ทุก 6 ชั่วโมง",
    "sync_mode": "incremental",
    "transforms": ["dbt_staging", "dbt_marts"],
})

# IDP Tools
idp_tools = {
    "Backstage": {"by": "Spotify", "type": "Developer Portal", "plugins": "200+"},
    "Port": {"by": "Port.io", "type": "IDP Platform", "plugins": "Built-in"},
    "Humanitec": {"by": "Humanitec", "type": "Platform Orchestrator", "plugins": "Drivers"},
    "Kratix": {"by": "Syntasso", "type": "Framework", "plugins": "Custom Promises"},
}

print(f"\n\nIDP Tools:")
for tool, info in idp_tools.items():
    print(f"  {tool} (by {info['by']}): {info['type']} | Plugins: {info['plugins']}")

Monitoring Pipeline

# pipeline_monitoring.py — Data Pipeline Monitoring
monitoring = {
    "Sync Status": {
        "metric": "airbyte_sync_status",
        "alert": "Sync Failed -> Slack + PagerDuty",
        "dashboard": "Grafana Airbyte Dashboard",
    },
    "Sync Duration": {
        "metric": "airbyte_sync_duration_seconds",
        "alert": "Duration > 2x Average -> Warning",
        "dashboard": "Duration Trend Chart",
    },
    "Records Synced": {
        "metric": "airbyte_records_emitted_total",
        "alert": "0 Records -> Check Source",
        "dashboard": "Records per Sync Chart",
    },
    "Data Freshness": {
        "metric": "time_since_last_successful_sync",
        "alert": "> SLA Threshold -> Critical",
        "dashboard": "Freshness Heatmap",
    },
    "Error Rate": {
        "metric": "airbyte_sync_errors_total / airbyte_sync_total",
        "alert": "> 5% -> Investigate",
        "dashboard": "Error Rate Trend",
    },
}

print("Data Pipeline Monitoring:")
for metric_name, info in monitoring.items():
    print(f"\n  [{metric_name}]")
    for key, value in info.items():
        print(f"    {key}: {value}")

# Airbyte vs Others
comparison = {
    "Airbyte": {"type": "Open Source ELT", "connectors": "300+", "pricing": "Free / Cloud Paid", "hosting": "Self-hosted / Cloud"},
    "Fivetran": {"type": "Managed ELT", "connectors": "500+", "pricing": "Volume-based (แพง)", "hosting": "Cloud Only"},
    "Stitch": {"type": "Managed ETL", "connectors": "200+", "pricing": "Row-based", "hosting": "Cloud Only"},
    "Meltano": {"type": "Open Source ELT", "connectors": "600+ (Singer)", "pricing": "Free", "hosting": "Self-hosted"},
}

print(f"\n\nELT Platform Comparison:")
for tool, info in comparison.items():
    print(f"  {tool}: {info['type']} | Connectors: {info['connectors']} | {info['pricing']}")

Best Practices

Airbyte คืออะไร

Open Source ELT Platform Data Pipeline 300+ Connectors BigQuery Snowflake Redshift Full Refresh Incremental Sync UI Config Docker Kubernetes

Internal Developer Platform คืออะไร

IDP แพลตฟอร์มภายในองค์กร Developer สร้าง Deploy จัดการแอปด้วยตัวเอง Self-service CI/CD Infrastructure Monitoring Backstage Port Humanitec

Airbyte ใช้กับ IDP อย่างไร

Self-service Data Pipeline Developer เลือก Source Destination IDP Portal Airbyte API สร้าง Connections อัตโนมัติ Template Backstage Plugin Pipeline Status

Airbyte ต่างจาก Fivetran อย่างไร

Airbyte Open Source ฟรี Self-hosted ควบคุมข้อมูล 300+ Connectors Fivetran Managed SaaS ง่ายกว่า 500+ แพงตาม Volume Airbyte เหมาะ Self-hosted ควบคุมค่าใช้จ่าย

สรุป

Airbyte Open Source ELT 300+ Connectors Incremental Sync Internal Developer Platform IDP Self-service Backstage dbt Transform Monitoring Sync Status Duration Records Freshness Schema Changes

📖 บทความที่เกี่ยวข้อง

AWS Glue ETL Internal Developer Platformอ่านบทความ → Container Security Trivy Internal Developer Platformอ่านบทความ → Immutable OS Fedora CoreOS Internal Developer Platformอ่านบทความ → Burp Suite Pro Internal Developer Platformอ่านบทความ → TensorFlow Serving Internal Developer Platformอ่านบทความ →

📚 ดูบทความทั้งหมด →