Airbyte ETL ?????????????????????
Airbyte ???????????? open source ELT (Extract, Load, Transform) platform ????????????????????????????????????????????????????????? sources ??????????????? (databases, APIs, SaaS tools) ??????????????????????????????????????? destinations (data warehouses, data lakes) ?????? connectors ???????????????????????????????????????????????? 300 ????????? ????????????????????????????????? ????????? Docker
?????????????????? Machine Learning Pipeline Airbyte ??????????????????????????????????????? data ingestion layer ???????????????????????????????????????????????? sources ????????????????????????????????????????????? ?????????????????????????????????????????????????????? feature engineering ????????? model training ??????????????? data pipeline reproducible ????????? automated
???????????????????????? Airbyte ?????????????????? ML Open source self-hosted ???????????????????????? license, Connector-based architecture ??????????????? source/destination ????????????, CDC (Change Data Capture) ???????????????????????? data ??????????????????????????????, Incremental sync ????????????????????? full load ??????????????????, Schema management ????????????????????? schema changes ???????????????????????????, API & Terraform ?????????????????? pipelines as code
????????????????????? Airbyte ?????????????????? ML Pipeline
Setup Airbyte ??????????????? ML tools
# === Airbyte Installation ===
# 1. Docker Compose (recommended for development)
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh
# Airbyte UI: http://localhost:8000
# Default: admin / password
# 2. Production Setup with Docker Compose
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
airbyte-server:
image: airbyte/server:latest
ports:
- "8001:8001"
environment:
DATABASE_URL: postgresql://airbyte:password@postgres:5432/airbyte
TRACKING_STRATEGY: logging
WORKER_ENVIRONMENT: docker
airbyte-webapp:
image: airbyte/webapp:latest
ports:
- "8000:80"
environment:
INTERNAL_API_HOST: airbyte-server:8001
airbyte-worker:
image: airbyte/worker:latest
environment:
DATABASE_URL: postgresql://airbyte:password@postgres:5432/airbyte
WORKSPACE_ROOT: /tmp/workspace
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- workspace:/tmp/workspace
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: airbyte
POSTGRES_USER: airbyte
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
workspace:
postgres_data:
EOF
docker compose up -d
# 3. Kubernetes Deployment (production)
helm repo add airbyte https://airbytehq.github.io/helm-charts
helm repo update
cat > values.yaml << 'EOF'
global:
database:
type: external
host: postgres.example.com
port: 5432
database: airbyte
user: airbyte
password: password
webapp:
ingress:
enabled: true
hosts:
- host: airbyte.example.com
paths:
- path: /
pathType: Prefix
worker:
resources:
limits:
cpu: "4"
memory: "8Gi"
requests:
cpu: "2"
memory: "4Gi"
EOF
helm install airbyte airbyte/airbyte -f values.yaml -n airbyte --create-namespace
echo "Airbyte installed"
??????????????? Data Pipeline ?????????????????? ML
Configure data pipelines ?????????????????? ML training data
#!/usr/bin/env python3
# ml_data_pipeline.py ??? ML Data Pipeline with Airbyte
import json
import logging
import urllib.request
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")
class AirbyteMLPipeline:
def __init__(self, api_url="http://localhost:8001/api/v1"):
self.api_url = api_url
def create_source(self, name, source_type, config):
"""Create a data source"""
payload = {
"name": name,
"sourceDefinitionId": source_type,
"connectionConfiguration": config,
}
return self._api_call("sources/create", payload)
def create_destination(self, name, dest_type, config):
"""Create a destination"""
payload = {
"name": name,
"destinationDefinitionId": dest_type,
"connectionConfiguration": config,
}
return self._api_call("destinations/create", payload)
def create_connection(self, source_id, dest_id, streams, schedule="every 6 hours"):
"""Create a sync connection"""
payload = {
"sourceId": source_id,
"destinationId": dest_id,
"syncCatalog": {"streams": streams},
"scheduleType": "cron",
"scheduleData": {"cron": {"cronExpression": "0 */6 * * *"}},
"status": "active",
"namespaceDefinition": "destination",
"prefix": "ml_",
}
return self._api_call("connections/create", payload)
def _api_call(self, endpoint, payload):
"""Make API call to Airbyte"""
try:
data = json.dumps(payload).encode()
req = urllib.request.Request(
f"{self.api_url}/{endpoint}",
data=data,
headers={"Content-Type": "application/json"},
)
response = urllib.request.urlopen(req)
return json.loads(response.read())
except Exception as e:
return {"error": str(e)}
def ml_pipeline_config(self):
"""Example ML pipeline configuration"""
return {
"sources": {
"production_db": {
"type": "PostgreSQL",
"tables": ["users", "orders", "products", "events"],
"sync_mode": "incremental",
"cursor_field": "updated_at",
},
"analytics_api": {
"type": "Google Analytics",
"metrics": ["sessions", "pageviews", "conversions"],
"sync_mode": "incremental",
},
"crm": {
"type": "Salesforce",
"objects": ["contacts", "opportunities", "activities"],
"sync_mode": "incremental",
},
},
"destination": {
"type": "BigQuery / Snowflake / S3",
"schema": "ml_training_data",
"format": "Parquet (for ML efficiency)",
},
"schedule": "Every 6 hours (or event-triggered)",
}
pipeline = AirbyteMLPipeline()
config = pipeline.ml_pipeline_config()
print("ML Pipeline Sources:")
for name, src in config["sources"].items():
print(f" {name}: {src['type']} ({src['sync_mode']})")
print(f"\nDestination: {config['destination']['type']}")
print(f"Schedule: {config['schedule']}")
Transform Data ?????????????????? Training
Transform raw data ???????????? ML features
#!/usr/bin/env python3
# data_transform.py ??? Data Transformation for ML
import json
import logging
from typing import Dict, List
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("transform")
class MLDataTransformer:
def __init__(self):
self.transformations = []
def dbt_transformation_config(self):
"""dbt transformation for ML features"""
return {
"project_structure": {
"models/": {
"staging/": "Clean raw data from Airbyte",
"intermediate/": "Join and combine data",
"features/": "ML feature engineering",
"training/": "Final training datasets",
},
},
"example_models": {
"stg_users": {
"source": "ml_users",
"transforms": ["deduplicate", "fill_nulls", "type_cast"],
"sql": """
SELECT
user_id,
email,
COALESCE(country, 'unknown') as country,
created_at,
DATE_DIFF(CURRENT_DATE(), created_at, DAY) as account_age_days
FROM {{ source('airbyte', 'ml_users') }}
WHERE _airbyte_emitted_at = (
SELECT MAX(_airbyte_emitted_at) FROM {{ source('airbyte', 'ml_users') }}
)
""",
},
"feat_user_activity": {
"source": "stg_users + stg_events",
"transforms": ["aggregate", "window_functions"],
"sql": """
SELECT
u.user_id,
COUNT(e.event_id) as total_events_30d,
COUNT(DISTINCT e.session_id) as sessions_30d,
AVG(e.duration_sec) as avg_session_duration,
MAX(e.event_date) as last_active_date,
DATE_DIFF(CURRENT_DATE(), MAX(e.event_date), DAY) as days_since_active
FROM {{ ref('stg_users') }} u
LEFT JOIN {{ ref('stg_events') }} e
ON u.user_id = e.user_id
AND e.event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY u.user_id
""",
},
},
}
def feature_engineering_pipeline(self):
"""Feature engineering steps"""
return {
"numerical_features": {
"scaling": "StandardScaler or MinMaxScaler",
"missing_values": "Mean/Median imputation",
"outliers": "Clip at 1st/99th percentile",
},
"categorical_features": {
"encoding": "One-hot or Target encoding",
"missing_values": "Mode imputation or 'unknown' category",
"high_cardinality": "Frequency encoding or embedding",
},
"temporal_features": {
"extraction": ["day_of_week", "hour", "month", "is_weekend"],
"lag_features": ["value_1d_ago", "value_7d_ago", "value_30d_ago"],
"rolling": ["rolling_mean_7d", "rolling_std_7d"],
},
"text_features": {
"basic": ["word_count", "char_count", "avg_word_length"],
"advanced": ["TF-IDF", "sentence embeddings (BERT)"],
},
}
transformer = MLDataTransformer()
dbt = transformer.dbt_transformation_config()
print("dbt Project Structure:")
for folder, desc in dbt["project_structure"]["models/"].items():
print(f" {folder}: {desc}")
features = transformer.feature_engineering_pipeline()
print("\nFeature Engineering:")
for feat_type, methods in features.items():
print(f" {feat_type}: {list(methods.keys()) if isinstance(methods, dict) else methods}")
Orchestration ????????? Airflow
????????? Airflow orchestrate Airbyte + ML pipeline
# === Airflow + Airbyte Orchestration ===
# 1. Install Airbyte Provider for Airflow
pip install apache-airflow-providers-airbyte
# 2. DAG: Complete ML Pipeline
cat > dags/ml_pipeline_dag.py << 'PYEOF'
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from datetime import datetime, timedelta
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"ml_training_pipeline",
default_args=default_args,
description="End-to-end ML training pipeline",
schedule_interval="0 2 * * *", # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ml", "training"],
) as dag:
# Step 1: Sync data from sources via Airbyte
sync_production_db = AirbyteTriggerSyncOperator(
task_id="sync_production_db",
airbyte_conn_id="airbyte_default",
connection_id="production-db-connection-id",
asynchronous=True,
)
wait_for_sync = AirbyteJobSensor(
task_id="wait_for_sync",
airbyte_conn_id="airbyte_default",
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='sync_production_db') }}",
timeout=3600,
)
# Step 2: Transform data with dbt
def run_dbt_transform():
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:ml_features"],
cwd="/opt/dbt_project",
capture_output=True, text=True,
)
if result.returncode != 0:
raise Exception(f"dbt failed: {result.stderr}")
print(result.stdout)
transform = PythonOperator(
task_id="dbt_transform",
python_callable=run_dbt_transform,
)
# Step 3: Train model
def train_model():
print("Training model...")
# Import and run training script
# from src.train import train
# train(config="configs/production.yaml")
train = PythonOperator(
task_id="train_model",
python_callable=train_model,
)
# Step 4: Evaluate
def evaluate_model():
print("Evaluating model...")
# metrics = evaluate(model_path, test_data)
# if metrics["accuracy"] < 0.85:
# raise Exception("Model below threshold")
evaluate = PythonOperator(
task_id="evaluate_model",
python_callable=evaluate_model,
)
# Pipeline flow
sync_production_db >> wait_for_sync >> transform >> train >> evaluate
PYEOF
echo "Airflow DAG created"
Monitoring ????????? Data Quality
Monitor data pipeline ????????????????????????????????????????????????
#!/usr/bin/env python3
# data_quality.py ??? Data Quality Monitoring
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("quality")
class DataQualityMonitor:
def __init__(self):
self.checks = []
def quality_checks(self):
return {
"completeness": {
"description": "?????????????????????????????? data ???????????????????????? null",
"checks": [
{"column": "user_id", "rule": "not_null", "threshold": 100},
{"column": "email", "rule": "not_null", "threshold": 95},
{"column": "country", "rule": "not_null", "threshold": 90},
],
},
"freshness": {
"description": "?????????????????????????????? data ???????????????????????????????????????",
"checks": [
{"table": "ml_users", "max_age_hours": 24},
{"table": "ml_events", "max_age_hours": 6},
{"table": "ml_orders", "max_age_hours": 12},
],
},
"volume": {
"description": "???????????????????????????????????? records",
"checks": [
{"table": "ml_users", "min_rows": 10000, "max_change_pct": 20},
{"table": "ml_events", "min_rows": 100000, "max_change_pct": 50},
],
},
"uniqueness": {
"description": "????????????????????? duplicate records",
"checks": [
{"table": "ml_users", "unique_columns": ["user_id"]},
{"table": "ml_orders", "unique_columns": ["order_id"]},
],
},
"distribution": {
"description": "????????????????????? data distribution ???????????????????????????????????????????????????",
"checks": [
{"column": "age", "expected_mean_range": [25, 45]},
{"column": "order_value", "expected_median_range": [100, 500]},
],
},
}
def pipeline_dashboard(self):
return {
"airbyte_sync": {
"last_sync": "2024-06-15 02:15:00",
"status": "success",
"records_synced": 15420,
"duration_minutes": 12,
"next_sync": "2024-06-15 08:00:00",
},
"dbt_transform": {
"last_run": "2024-06-15 02:28:00",
"status": "success",
"models_run": 15,
"tests_passed": 42,
"tests_failed": 0,
},
"data_quality": {
"total_checks": 25,
"passed": 24,
"failed": 1,
"warning": "ml_events freshness: last record 8 hours ago (threshold: 6)",
},
"ml_training": {
"last_training": "2024-06-15 03:00:00",
"model_version": "v2.5.1",
"accuracy": 0.923,
"status": "deployed",
},
}
monitor = DataQualityMonitor()
checks = monitor.quality_checks()
print("Data Quality Checks:")
for category, info in checks.items():
print(f" {category}: {info['description']} ({len(info['checks'])} checks)")
dash = monitor.pipeline_dashboard()
print(f"\nPipeline Status:")
print(f" Sync: {dash['airbyte_sync']['status']} ({dash['airbyte_sync']['records_synced']} records)")
print(f" Transform: {dash['dbt_transform']['models_run']} models, {dash['dbt_transform']['tests_passed']} tests passed")
print(f" Quality: {dash['data_quality']['passed']}/{dash['data_quality']['total_checks']} passed")
print(f" ML: {dash['ml_training']['model_version']} accuracy {dash['ml_training']['accuracy']}")
FAQ ??????????????????????????????????????????
Q: Airbyte ????????? Fivetran ???????????????????????????????????????????
A: Airbyte ???????????? open source self-hosted ????????? (?????? cloud version ????????????) connectors 300+ ????????? customize ????????? ???????????? manage infrastructure ????????? community-driven Fivetran ???????????? fully managed SaaS ????????????????????? manage infrastructure connectors ??????????????????????????????????????? (maintained by Fivetran team) ????????? ($1/credit pricing) ??????????????? enterprise ?????????????????????????????? manage infrastructure ??????????????? Airbyte ????????? ?????????????????????, ????????????????????? self-hosted, ???????????? customize connectors ??????????????? Fivetran ????????? ????????????????????? managed service, enterprise support, ??????????????? DevOps team
Q: ELT ????????? ETL ???????????????????????????????????????????
A: ETL (Extract, Transform, Load) Transform data ???????????????????????????????????? destination ?????????????????????????????? destination ?????? compute ??????????????? (???????????? traditional database) ???????????? clean data ???????????????????????? ELT (Extract, Load, Transform) Load raw data ???????????? destination ???????????? ???????????? transform ????????? destination (???????????? BigQuery, Snowflake) ?????????????????????????????? destination ?????? compute ????????? ????????????????????? raw data ?????????????????? ad-hoc analysis Airbyte ???????????? ELT tool ???????????? raw data ???????????? warehouse ????????????????????? dbt transform ?????????????????? ML Pipeline ??????????????? ELT ??????????????????????????? raw data ????????? ????????????????????? features ??????????????????????????????????????? re-extract
Q: CDC (Change Data Capture) ????????????????????? ?????????????????????????????????????
A: CDC ??????????????????????????????????????????????????? data ?????????????????????????????????????????? (INSERT, UPDATE, DELETE) ???????????????????????? full scan ???????????? table ?????????????????? Airbyte ?????????????????? CDC ???????????? Debezium ?????????????????? PostgreSQL, MySQL, SQL Server ????????????????????????????????? ML ??????????????? ?????????????????? sync ???????????????????????????????????????????????????????????????, ?????? load ?????? production database, ????????? real-time data ??????????????????????????? ML model ????????? data ??????????????????, ????????? miss DELETE events (full load ????????????????????? deleted rows) Setup ???????????????????????? WAL (Write-Ahead Log) ?????? database, ??????????????? replication slot, Airbyte ???????????? WAL ???????????????????????????????????? changes
Q: Airbyte + dbt + Airflow ???????????? stack ?????????????????????????
A: ???????????? Modern Data Stack ?????????????????????????????? Airbyte ?????? data ingestion (EL), dbt ?????? data transformation (T), Airflow ?????? orchestration (schedule + dependency management) ??????????????? Open source ?????????????????????, ???????????????????????? best-in-class ??????????????????????????????????????????????????????, community ????????????, flexible ????????????????????? ???????????? manage 3 tools, learning curve ?????????, ?????????????????? DevOps skills Alternatives Airbyte Cloud + dbt Cloud + Dagster (managed, ????????????????????????), Meltano (all-in-one open source ELT), Prefect ????????? Airflow (Python-native, ????????????????????????) ?????????????????? ML Pipeline stack ?????????????????????????????????????????????????????????????????????
