Technology

Airbyte ETL Machine Learning Pipeline สร้าง Data Pipeline สำหรับ ML

airbyte etl machine learning pipeline
Airbyte ETL Machine Learning Pipeline | SiamCafe Blog
2025-09-14· อ. บอม — SiamCafe.net· 1,257 คำ

Airbyte ETL ?????????????????????

Airbyte ???????????? open source ELT (Extract, Load, Transform) platform ????????????????????????????????????????????????????????? sources ??????????????? (databases, APIs, SaaS tools) ??????????????????????????????????????? destinations (data warehouses, data lakes) ?????? connectors ???????????????????????????????????????????????? 300 ????????? ????????????????????????????????? ????????? Docker

?????????????????? Machine Learning Pipeline Airbyte ??????????????????????????????????????? data ingestion layer ???????????????????????????????????????????????? sources ????????????????????????????????????????????? ?????????????????????????????????????????????????????? feature engineering ????????? model training ??????????????? data pipeline reproducible ????????? automated

???????????????????????? Airbyte ?????????????????? ML Open source self-hosted ???????????????????????? license, Connector-based architecture ??????????????? source/destination ????????????, CDC (Change Data Capture) ???????????????????????? data ??????????????????????????????, Incremental sync ????????????????????? full load ??????????????????, Schema management ????????????????????? schema changes ???????????????????????????, API & Terraform ?????????????????? pipelines as code

????????????????????? Airbyte ?????????????????? ML Pipeline

Setup Airbyte ??????????????? ML tools

# === Airbyte Installation ===

# 1. Docker Compose (recommended for development)
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh

# Airbyte UI: http://localhost:8000
# Default: admin / password

# 2. Production Setup with Docker Compose
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  airbyte-server:
    image: airbyte/server:latest
    ports:
      - "8001:8001"
    environment:
      DATABASE_URL: postgresql://airbyte:password@postgres:5432/airbyte
      TRACKING_STRATEGY: logging
      WORKER_ENVIRONMENT: docker

  airbyte-webapp:
    image: airbyte/webapp:latest
    ports:
      - "8000:80"
    environment:
      INTERNAL_API_HOST: airbyte-server:8001

  airbyte-worker:
    image: airbyte/worker:latest
    environment:
      DATABASE_URL: postgresql://airbyte:password@postgres:5432/airbyte
      WORKSPACE_ROOT: /tmp/workspace
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - workspace:/tmp/workspace

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: airbyte
      POSTGRES_USER: airbyte
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  workspace:
  postgres_data:
EOF

docker compose up -d

# 3. Kubernetes Deployment (production)
helm repo add airbyte https://airbytehq.github.io/helm-charts
helm repo update

cat > values.yaml << 'EOF'
global:
  database:
    type: external
    host: postgres.example.com
    port: 5432
    database: airbyte
    user: airbyte
    password: password

webapp:
  ingress:
    enabled: true
    hosts:
      - host: airbyte.example.com
        paths:
          - path: /
            pathType: Prefix

worker:
  resources:
    limits:
      cpu: "4"
      memory: "8Gi"
    requests:
      cpu: "2"
      memory: "4Gi"
EOF

helm install airbyte airbyte/airbyte -f values.yaml -n airbyte --create-namespace

echo "Airbyte installed"

??????????????? Data Pipeline ?????????????????? ML

Configure data pipelines ?????????????????? ML training data

#!/usr/bin/env python3
# ml_data_pipeline.py ??? ML Data Pipeline with Airbyte
import json
import logging
import urllib.request
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")

class AirbyteMLPipeline:
    def __init__(self, api_url="http://localhost:8001/api/v1"):
        self.api_url = api_url
    
    def create_source(self, name, source_type, config):
        """Create a data source"""
        payload = {
            "name": name,
            "sourceDefinitionId": source_type,
            "connectionConfiguration": config,
        }
        return self._api_call("sources/create", payload)
    
    def create_destination(self, name, dest_type, config):
        """Create a destination"""
        payload = {
            "name": name,
            "destinationDefinitionId": dest_type,
            "connectionConfiguration": config,
        }
        return self._api_call("destinations/create", payload)
    
    def create_connection(self, source_id, dest_id, streams, schedule="every 6 hours"):
        """Create a sync connection"""
        payload = {
            "sourceId": source_id,
            "destinationId": dest_id,
            "syncCatalog": {"streams": streams},
            "scheduleType": "cron",
            "scheduleData": {"cron": {"cronExpression": "0 */6 * * *"}},
            "status": "active",
            "namespaceDefinition": "destination",
            "prefix": "ml_",
        }
        return self._api_call("connections/create", payload)
    
    def _api_call(self, endpoint, payload):
        """Make API call to Airbyte"""
        try:
            data = json.dumps(payload).encode()
            req = urllib.request.Request(
                f"{self.api_url}/{endpoint}",
                data=data,
                headers={"Content-Type": "application/json"},
            )
            response = urllib.request.urlopen(req)
            return json.loads(response.read())
        except Exception as e:
            return {"error": str(e)}
    
    def ml_pipeline_config(self):
        """Example ML pipeline configuration"""
        return {
            "sources": {
                "production_db": {
                    "type": "PostgreSQL",
                    "tables": ["users", "orders", "products", "events"],
                    "sync_mode": "incremental",
                    "cursor_field": "updated_at",
                },
                "analytics_api": {
                    "type": "Google Analytics",
                    "metrics": ["sessions", "pageviews", "conversions"],
                    "sync_mode": "incremental",
                },
                "crm": {
                    "type": "Salesforce",
                    "objects": ["contacts", "opportunities", "activities"],
                    "sync_mode": "incremental",
                },
            },
            "destination": {
                "type": "BigQuery / Snowflake / S3",
                "schema": "ml_training_data",
                "format": "Parquet (for ML efficiency)",
            },
            "schedule": "Every 6 hours (or event-triggered)",
        }

pipeline = AirbyteMLPipeline()
config = pipeline.ml_pipeline_config()
print("ML Pipeline Sources:")
for name, src in config["sources"].items():
    print(f"  {name}: {src['type']} ({src['sync_mode']})")
print(f"\nDestination: {config['destination']['type']}")
print(f"Schedule: {config['schedule']}")

Transform Data ?????????????????? Training

Transform raw data ???????????? ML features

#!/usr/bin/env python3
# data_transform.py ??? Data Transformation for ML
import json
import logging
from typing import Dict, List
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("transform")

class MLDataTransformer:
    def __init__(self):
        self.transformations = []
    
    def dbt_transformation_config(self):
        """dbt transformation for ML features"""
        return {
            "project_structure": {
                "models/": {
                    "staging/": "Clean raw data from Airbyte",
                    "intermediate/": "Join and combine data",
                    "features/": "ML feature engineering",
                    "training/": "Final training datasets",
                },
            },
            "example_models": {
                "stg_users": {
                    "source": "ml_users",
                    "transforms": ["deduplicate", "fill_nulls", "type_cast"],
                    "sql": """
SELECT
  user_id,
  email,
  COALESCE(country, 'unknown') as country,
  created_at,
  DATE_DIFF(CURRENT_DATE(), created_at, DAY) as account_age_days
FROM {{ source('airbyte', 'ml_users') }}
WHERE _airbyte_emitted_at = (
  SELECT MAX(_airbyte_emitted_at) FROM {{ source('airbyte', 'ml_users') }}
)
                    """,
                },
                "feat_user_activity": {
                    "source": "stg_users + stg_events",
                    "transforms": ["aggregate", "window_functions"],
                    "sql": """
SELECT
  u.user_id,
  COUNT(e.event_id) as total_events_30d,
  COUNT(DISTINCT e.session_id) as sessions_30d,
  AVG(e.duration_sec) as avg_session_duration,
  MAX(e.event_date) as last_active_date,
  DATE_DIFF(CURRENT_DATE(), MAX(e.event_date), DAY) as days_since_active
FROM {{ ref('stg_users') }} u
LEFT JOIN {{ ref('stg_events') }} e
  ON u.user_id = e.user_id
  AND e.event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY u.user_id
                    """,
                },
            },
        }
    
    def feature_engineering_pipeline(self):
        """Feature engineering steps"""
        return {
            "numerical_features": {
                "scaling": "StandardScaler or MinMaxScaler",
                "missing_values": "Mean/Median imputation",
                "outliers": "Clip at 1st/99th percentile",
            },
            "categorical_features": {
                "encoding": "One-hot or Target encoding",
                "missing_values": "Mode imputation or 'unknown' category",
                "high_cardinality": "Frequency encoding or embedding",
            },
            "temporal_features": {
                "extraction": ["day_of_week", "hour", "month", "is_weekend"],
                "lag_features": ["value_1d_ago", "value_7d_ago", "value_30d_ago"],
                "rolling": ["rolling_mean_7d", "rolling_std_7d"],
            },
            "text_features": {
                "basic": ["word_count", "char_count", "avg_word_length"],
                "advanced": ["TF-IDF", "sentence embeddings (BERT)"],
            },
        }

transformer = MLDataTransformer()
dbt = transformer.dbt_transformation_config()
print("dbt Project Structure:")
for folder, desc in dbt["project_structure"]["models/"].items():
    print(f"  {folder}: {desc}")

features = transformer.feature_engineering_pipeline()
print("\nFeature Engineering:")
for feat_type, methods in features.items():
    print(f"  {feat_type}: {list(methods.keys()) if isinstance(methods, dict) else methods}")

Orchestration ????????? Airflow

????????? Airflow orchestrate Airbyte + ML pipeline

# === Airflow + Airbyte Orchestration ===

# 1. Install Airbyte Provider for Airflow
pip install apache-airflow-providers-airbyte

# 2. DAG: Complete ML Pipeline
cat > dags/ml_pipeline_dag.py << 'PYEOF'
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "ml_training_pipeline",
    default_args=default_args,
    description="End-to-end ML training pipeline",
    schedule_interval="0 2 * * *",  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["ml", "training"],
) as dag:

    # Step 1: Sync data from sources via Airbyte
    sync_production_db = AirbyteTriggerSyncOperator(
        task_id="sync_production_db",
        airbyte_conn_id="airbyte_default",
        connection_id="production-db-connection-id",
        asynchronous=True,
    )

    wait_for_sync = AirbyteJobSensor(
        task_id="wait_for_sync",
        airbyte_conn_id="airbyte_default",
        airbyte_job_id="{{ task_instance.xcom_pull(task_ids='sync_production_db') }}",
        timeout=3600,
    )

    # Step 2: Transform data with dbt
    def run_dbt_transform():
        import subprocess
        result = subprocess.run(
            ["dbt", "run", "--select", "tag:ml_features"],
            cwd="/opt/dbt_project",
            capture_output=True, text=True,
        )
        if result.returncode != 0:
            raise Exception(f"dbt failed: {result.stderr}")
        print(result.stdout)

    transform = PythonOperator(
        task_id="dbt_transform",
        python_callable=run_dbt_transform,
    )

    # Step 3: Train model
    def train_model():
        print("Training model...")
        # Import and run training script
        # from src.train import train
        # train(config="configs/production.yaml")

    train = PythonOperator(
        task_id="train_model",
        python_callable=train_model,
    )

    # Step 4: Evaluate
    def evaluate_model():
        print("Evaluating model...")
        # metrics = evaluate(model_path, test_data)
        # if metrics["accuracy"] < 0.85:
        #     raise Exception("Model below threshold")

    evaluate = PythonOperator(
        task_id="evaluate_model",
        python_callable=evaluate_model,
    )

    # Pipeline flow
    sync_production_db >> wait_for_sync >> transform >> train >> evaluate
PYEOF

echo "Airflow DAG created"

Monitoring ????????? Data Quality

Monitor data pipeline ????????????????????????????????????????????????

#!/usr/bin/env python3
# data_quality.py ??? Data Quality Monitoring
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("quality")

class DataQualityMonitor:
    def __init__(self):
        self.checks = []
    
    def quality_checks(self):
        return {
            "completeness": {
                "description": "?????????????????????????????? data ???????????????????????? null",
                "checks": [
                    {"column": "user_id", "rule": "not_null", "threshold": 100},
                    {"column": "email", "rule": "not_null", "threshold": 95},
                    {"column": "country", "rule": "not_null", "threshold": 90},
                ],
            },
            "freshness": {
                "description": "?????????????????????????????? data ???????????????????????????????????????",
                "checks": [
                    {"table": "ml_users", "max_age_hours": 24},
                    {"table": "ml_events", "max_age_hours": 6},
                    {"table": "ml_orders", "max_age_hours": 12},
                ],
            },
            "volume": {
                "description": "???????????????????????????????????? records",
                "checks": [
                    {"table": "ml_users", "min_rows": 10000, "max_change_pct": 20},
                    {"table": "ml_events", "min_rows": 100000, "max_change_pct": 50},
                ],
            },
            "uniqueness": {
                "description": "????????????????????? duplicate records",
                "checks": [
                    {"table": "ml_users", "unique_columns": ["user_id"]},
                    {"table": "ml_orders", "unique_columns": ["order_id"]},
                ],
            },
            "distribution": {
                "description": "????????????????????? data distribution ???????????????????????????????????????????????????",
                "checks": [
                    {"column": "age", "expected_mean_range": [25, 45]},
                    {"column": "order_value", "expected_median_range": [100, 500]},
                ],
            },
        }
    
    def pipeline_dashboard(self):
        return {
            "airbyte_sync": {
                "last_sync": "2024-06-15 02:15:00",
                "status": "success",
                "records_synced": 15420,
                "duration_minutes": 12,
                "next_sync": "2024-06-15 08:00:00",
            },
            "dbt_transform": {
                "last_run": "2024-06-15 02:28:00",
                "status": "success",
                "models_run": 15,
                "tests_passed": 42,
                "tests_failed": 0,
            },
            "data_quality": {
                "total_checks": 25,
                "passed": 24,
                "failed": 1,
                "warning": "ml_events freshness: last record 8 hours ago (threshold: 6)",
            },
            "ml_training": {
                "last_training": "2024-06-15 03:00:00",
                "model_version": "v2.5.1",
                "accuracy": 0.923,
                "status": "deployed",
            },
        }

monitor = DataQualityMonitor()
checks = monitor.quality_checks()
print("Data Quality Checks:")
for category, info in checks.items():
    print(f"  {category}: {info['description']} ({len(info['checks'])} checks)")

dash = monitor.pipeline_dashboard()
print(f"\nPipeline Status:")
print(f"  Sync: {dash['airbyte_sync']['status']} ({dash['airbyte_sync']['records_synced']} records)")
print(f"  Transform: {dash['dbt_transform']['models_run']} models, {dash['dbt_transform']['tests_passed']} tests passed")
print(f"  Quality: {dash['data_quality']['passed']}/{dash['data_quality']['total_checks']} passed")
print(f"  ML: {dash['ml_training']['model_version']} accuracy {dash['ml_training']['accuracy']}")

FAQ ??????????????????????????????????????????

Q: Airbyte ????????? Fivetran ???????????????????????????????????????????

A: Airbyte ???????????? open source self-hosted ????????? (?????? cloud version ????????????) connectors 300+ ????????? customize ????????? ???????????? manage infrastructure ????????? community-driven Fivetran ???????????? fully managed SaaS ????????????????????? manage infrastructure connectors ??????????????????????????????????????? (maintained by Fivetran team) ????????? ($1/credit pricing) ??????????????? enterprise ?????????????????????????????? manage infrastructure ??????????????? Airbyte ????????? ?????????????????????, ????????????????????? self-hosted, ???????????? customize connectors ??????????????? Fivetran ????????? ????????????????????? managed service, enterprise support, ??????????????? DevOps team

Q: ELT ????????? ETL ???????????????????????????????????????????

A: ETL (Extract, Transform, Load) Transform data ???????????????????????????????????? destination ?????????????????????????????? destination ?????? compute ??????????????? (???????????? traditional database) ???????????? clean data ???????????????????????? ELT (Extract, Load, Transform) Load raw data ???????????? destination ???????????? ???????????? transform ????????? destination (???????????? BigQuery, Snowflake) ?????????????????????????????? destination ?????? compute ????????? ????????????????????? raw data ?????????????????? ad-hoc analysis Airbyte ???????????? ELT tool ???????????? raw data ???????????? warehouse ????????????????????? dbt transform ?????????????????? ML Pipeline ??????????????? ELT ??????????????????????????? raw data ????????? ????????????????????? features ??????????????????????????????????????? re-extract

Q: CDC (Change Data Capture) ????????????????????? ?????????????????????????????????????

A: CDC ??????????????????????????????????????????????????? data ?????????????????????????????????????????? (INSERT, UPDATE, DELETE) ???????????????????????? full scan ???????????? table ?????????????????? Airbyte ?????????????????? CDC ???????????? Debezium ?????????????????? PostgreSQL, MySQL, SQL Server ????????????????????????????????? ML ??????????????? ?????????????????? sync ???????????????????????????????????????????????????????????????, ?????? load ?????? production database, ????????? real-time data ??????????????????????????? ML model ????????? data ??????????????????, ????????? miss DELETE events (full load ????????????????????? deleted rows) Setup ???????????????????????? WAL (Write-Ahead Log) ?????? database, ??????????????? replication slot, Airbyte ???????????? WAL ???????????????????????????????????? changes

Q: Airbyte + dbt + Airflow ???????????? stack ?????????????????????????

A: ???????????? Modern Data Stack ?????????????????????????????? Airbyte ?????? data ingestion (EL), dbt ?????? data transformation (T), Airflow ?????? orchestration (schedule + dependency management) ??????????????? Open source ?????????????????????, ???????????????????????? best-in-class ??????????????????????????????????????????????????????, community ????????????, flexible ????????????????????? ???????????? manage 3 tools, learning curve ?????????, ?????????????????? DevOps skills Alternatives Airbyte Cloud + dbt Cloud + Dagster (managed, ????????????????????????), Meltano (all-in-one open source ELT), Prefect ????????? Airflow (Python-native, ????????????????????????) ?????????????????? ML Pipeline stack ?????????????????????????????????????????????????????????????????????

📖 บทความที่เกี่ยวข้อง

AWS Glue ETL Machine Learning Pipelineอ่านบทความ → Embedding Model Machine Learning Pipelineอ่านบทความ → Avro Schema Machine Learning Pipelineอ่านบทความ → GraphQL Federation Machine Learning Pipelineอ่านบทความ → AWS Bedrock AI Machine Learning Pipelineอ่านบทความ →

📚 ดูบทความทั้งหมด →