Technology

Data Lakehouse Zero Downtime Deployment Deploy Data Platform ไมหยดบรการ

data lakehouse zero downtime deployment
Data Lakehouse Zero Downtime Deployment | SiamCafe Blog
2026-05-09· อ. บอม — SiamCafe.net· 1,271 คำ

Zero Downtime Deployment ?????????????????? Data Lakehouse ?????????????????????

Data Lakehouse ???????????? architecture ?????????????????? Data Lake ????????? Data Warehouse ????????? open table formats ???????????? Delta Lake, Apache Iceberg, Apache Hudi Zero Downtime Deployment ?????????????????????????????? deploy changes (schema, code, infrastructure) ????????????????????????????????? data consumers ??????????????????????????????????????????

??????????????????????????????????????? Data Lakehouse Schema changes ???????????? backward compatible ????????? existing queries, ETL pipelines ??????????????????????????????????????????????????????????????????????????? deployment, Query engines (Trino, Spark) ???????????? serve requests ?????????????????????, Metadata services (Hive Metastore, Unity Catalog) ???????????? available, Data quality ???????????? consistent ????????????????????????????????? deploy

Components ????????????????????? deploy ?????????????????? downtime ETL/ELT pipelines (Spark jobs, dbt models), Query engines (Trino workers, Spark Thrift Server), Metadata services (Hive Metastore, Iceberg Catalog), Storage layer (table format upgrades), Infrastructure (Kubernetes, networking)

????????????????????? Zero Downtime ?????????????????? Data Platform

Deployment strategies ?????????????????? data platform components

# === Zero Downtime Strategies ===

# 1. Blue-Green Deployment for Query Engine
cat > blue-green-trino.yaml << 'EOF'
apiVersion: v1
kind: Service
metadata:
  name: trino-gateway
  namespace: data-platform
spec:
  selector:
    app: trino
    version: blue  # Switch to 'green' during deployment
  ports:
    - port: 8080
      targetPort: 8080
---
# Blue cluster (current)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: trino-blue
spec:
  replicas: 5
  selector:
    matchLabels:
      app: trino
      version: blue
  template:
    metadata:
      labels:
        app: trino
        version: blue
    spec:
      containers:
        - name: trino
          image: trinodb/trino:439
          resources:
            requests:
              cpu: "8"
              memory: "64Gi"
---
# Green cluster (new version)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: trino-green
spec:
  replicas: 5
  selector:
    matchLabels:
      app: trino
      version: green
  template:
    metadata:
      labels:
        app: trino
        version: green
    spec:
      containers:
        - name: trino
          image: trinodb/trino:440
          resources:
            requests:
              cpu: "8"
              memory: "64Gi"
EOF

# 2. Rolling Update for Spark Workers
cat > spark-rolling.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-workers
  namespace: data-platform
spec:
  replicas: 10
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 2
      maxUnavailable: 1
  selector:
    matchLabels:
      app: spark-worker
  template:
    spec:
      terminationGracePeriodSeconds: 300
      containers:
        - name: spark
          image: spark:3.5.1
          lifecycle:
            preStop:
              exec:
                command:
                  - /bin/sh
                  - -c
                  - "sleep 30 && /opt/spark/sbin/stop-worker.sh"
EOF

# 3. Canary for dbt Models
cat > dbt_canary.sh << 'BASH'
#!/bin/bash
# Deploy dbt models with canary strategy
echo "=== dbt Canary Deployment ==="

# Step 1: Run new models to shadow schema
dbt run --target shadow --select new_models

# Step 2: Validate data quality
dbt test --target shadow --select new_models
if [ $? -ne 0 ]; then
    echo "FAIL: Data quality tests failed"
    exit 1
fi

# Step 3: Compare results
python3 compare_results.py --source production --target shadow

# Step 4: Swap if OK
dbt run --target production --select new_models --full-refresh
echo "Canary deployment complete"
BASH

echo "Zero downtime strategies configured"

Schema Evolution ????????? Migration

?????????????????? schema changes ????????????????????????????????? consumers

#!/usr/bin/env python3
# schema_evolution.py ??? Schema Evolution for Data Lakehouse
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("schema")

class SchemaEvolutionManager:
    """Manage schema changes with zero downtime"""
    
    def __init__(self):
        self.migrations = []
    
    def safe_operations(self):
        """Operations that are backward compatible"""
        return {
            "add_column": {
                "safe": True,
                "description": "??????????????? column ???????????? (nullable ???????????? ?????? default value)",
                "delta_lake": "ALTER TABLE ADD COLUMNS (new_col STRING)",
                "iceberg": "ALTER TABLE ADD COLUMN new_col string",
                "impact": "Existing queries ???????????????????????? (ignore new column)",
            },
            "widen_type": {
                "safe": True,
                "description": "???????????? data type (INT ??? BIGINT, FLOAT ??? DOUBLE)",
                "delta_lake": "Schema evolution: mergeSchema option",
                "iceberg": "ALTER TABLE ALTER COLUMN col TYPE bigint",
                "impact": "Existing data ????????? read ?????????",
            },
            "add_partition": {
                "safe": True,
                "description": "??????????????? partition field",
                "iceberg": "ALTER TABLE ADD PARTITION FIELD year(ts)",
                "impact": "Old partitions ????????????????????? new data ????????? partition ????????????",
            },
        }
    
    def unsafe_operations(self):
        """Operations that need migration strategy"""
        return {
            "drop_column": {
                "safe": False,
                "description": "?????? column",
                "strategy": "Deprecate first ??? Stop using ??? Drop after grace period",
                "steps": [
                    "1. Mark column as deprecated (metadata/comment)",
                    "2. Update all consumers to not use the column",
                    "3. Wait grace period (7-30 days)",
                    "4. Drop column",
                ],
            },
            "rename_column": {
                "safe": False,
                "description": "????????????????????????????????? column",
                "strategy": "Add new ??? Copy data ??? Deprecate old ??? Drop old",
                "steps": [
                    "1. Add new column with new name",
                    "2. Backfill data from old column",
                    "3. Update ETL to write both columns",
                    "4. Update consumers to use new column",
                    "5. Drop old column after grace period",
                ],
            },
            "change_type_narrowing": {
                "safe": False,
                "description": "?????? data type (BIGINT ??? INT, STRING ??? INT)",
                "strategy": "Create new column ??? Validate ??? Swap",
                "risk": "Data loss if values don't fit new type",
            },
        }
    
    def migration_plan(self, changes):
        """Generate migration plan"""
        safe = self.safe_operations()
        unsafe = self.unsafe_operations()
        
        plan = {"safe_changes": [], "unsafe_changes": [], "estimated_risk": "LOW"}
        
        for change in changes:
            op = change["operation"]
            if op in safe:
                plan["safe_changes"].append({**change, "strategy": "Direct apply"})
            elif op in unsafe:
                plan["unsafe_changes"].append({**change, "strategy": unsafe[op]["strategy"]})
                plan["estimated_risk"] = "HIGH"
        
        return plan

manager = SchemaEvolutionManager()
plan = manager.migration_plan([
    {"operation": "add_column", "table": "orders", "column": "discount_pct", "type": "DOUBLE"},
    {"operation": "rename_column", "table": "users", "old": "fname", "new": "first_name"},
    {"operation": "widen_type", "table": "products", "column": "price", "from": "FLOAT", "to": "DOUBLE"},
])

print(f"Migration Plan (Risk: {plan['estimated_risk']}):")
print(f"  Safe changes: {len(plan['safe_changes'])}")
for c in plan["safe_changes"]:
    print(f"    - {c['operation']}: {c['table']}.{c.get('column', c.get('old', ''))}")
print(f"  Unsafe changes: {len(plan['unsafe_changes'])}")
for c in plan["unsafe_changes"]:
    print(f"    - {c['operation']}: {c['table']} ??? {c['strategy']}")

Rolling Deployment ?????????????????? Data Services

Deploy data services ????????? rolling update

# === Rolling Deployment Pipeline ===

cat > deploy_pipeline.sh << 'BASH'
#!/bin/bash
# Zero Downtime Deployment Pipeline for Data Lakehouse
set -e

SERVICE=$1
VERSION=$2
NAMESPACE="data-platform"

echo "=== Deploying $SERVICE v$VERSION ==="

# Step 1: Pre-deployment checks
echo "[1/6] Pre-deployment validation..."
kubectl get pods -n $NAMESPACE -l app=$SERVICE --field-selector status.phase=Running | grep -c Running
if [ $? -ne 0 ]; then
    echo "FAIL: Current deployment unhealthy"
    exit 1
fi

# Step 2: Run database migrations (if any)
echo "[2/6] Running schema migrations..."
# dbt run --target production --select migrations
# python3 migrate.py --check-compatible

# Step 3: Deploy with rolling update
echo "[3/6] Rolling deployment..."
kubectl set image deployment/$SERVICE \
  $SERVICE=/: \
  -n $NAMESPACE

# Step 4: Wait for rollout
echo "[4/6] Waiting for rollout..."
kubectl rollout status deployment/$SERVICE -n $NAMESPACE --timeout=300s
if [ $? -ne 0 ]; then
    echo "FAIL: Rollout timeout, initiating rollback"
    kubectl rollout undo deployment/$SERVICE -n $NAMESPACE
    exit 1
fi

# Step 5: Post-deployment validation
echo "[5/6] Post-deployment validation..."
sleep 30

# Check health endpoint
HEALTH=$(kubectl exec -n $NAMESPACE deploy/$SERVICE -- curl -s http://localhost:8080/health)
if echo "$HEALTH" | grep -q "healthy"; then
    echo "Health check: PASS"
else
    echo "FAIL: Health check failed, rolling back"
    kubectl rollout undo deployment/$SERVICE -n $NAMESPACE
    exit 1
fi

# Run data quality tests
# dbt test --target production --select critical_tests

# Step 6: Notify
echo "[6/6] Deployment complete"
curl -X POST "https://hooks.slack.com/services/xxx" \
  -d "{\"text\":\"Deployed $SERVICE v$VERSION successfully\"}"
BASH

chmod +x deploy_pipeline.sh

# GitHub Actions workflow
cat > .github/workflows/deploy-data-platform.yml << 'EOF'
name: Deploy Data Platform

on:
  push:
    branches: [main]
    paths:
      - 'data-platform/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Run dbt tests
        run: |
          pip install dbt-core dbt-trino
          dbt test --target ci
          
  deploy:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Deploy to staging
        run: ./deploy_pipeline.sh trino-worker }
      - name: Smoke test
        run: python3 smoke_test.py --env staging
      - name: Deploy to production
        run: ./deploy_pipeline.sh trino-worker }
EOF

echo "Deployment pipeline configured"

Automated Testing ????????? Validation

??????????????????????????????????????????????????????????????????????????? deployment

#!/usr/bin/env python3
# deploy_validator.py ??? Deployment Validation for Data Lakehouse
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("validator")

class DeploymentValidator:
    """Validate data platform deployments"""
    
    def __init__(self):
        self.results = []
    
    def pre_deploy_checks(self):
        """Checks before deployment"""
        checks = [
            {"name": "Schema compatibility", "status": "PASS", "detail": "No breaking changes detected"},
            {"name": "ETL pipeline status", "status": "PASS", "detail": "All 15 pipelines healthy"},
            {"name": "Query engine health", "status": "PASS", "detail": "5/5 Trino workers running"},
            {"name": "Pending queries", "status": "WARN", "detail": "3 long-running queries (> 10min)"},
            {"name": "Storage capacity", "status": "PASS", "detail": "S3: 2.1TB/5TB used (42%)"},
            {"name": "Backup status", "status": "PASS", "detail": "Last backup: 2h ago"},
        ]
        return checks
    
    def post_deploy_checks(self):
        """Checks after deployment"""
        checks = [
            {"name": "Service health", "status": "PASS", "detail": "All endpoints responding"},
            {"name": "Query latency", "status": "PASS", "detail": "P99: 2.1s (baseline: 2.3s)"},
            {"name": "Data freshness", "status": "PASS", "detail": "Latest data: 5 min ago"},
            {"name": "Row count validation", "status": "PASS", "detail": "Orders: 1.2M (expected: 1.2M +/- 1%)"},
            {"name": "Schema validation", "status": "PASS", "detail": "All tables match expected schema"},
            {"name": "dbt tests", "status": "PASS", "detail": "45/45 tests passed"},
        ]
        return checks
    
    def data_quality_tests(self):
        """Run data quality validation"""
        tests = [
            {"test": "Not null: orders.order_id", "result": "PASS", "null_count": 0},
            {"test": "Unique: orders.order_id", "result": "PASS", "duplicates": 0},
            {"test": "Range: orders.amount > 0", "result": "PASS", "violations": 0},
            {"test": "Referential: orders.user_id in users", "result": "PASS", "orphans": 0},
            {"test": "Freshness: orders.created_at < 1h", "result": "PASS", "max_age_min": 12},
            {"test": "Volume: orders today > 1000", "result": "PASS", "count": 15230},
        ]
        return tests
    
    def rollback_decision(self, post_checks):
        """Decide if rollback is needed"""
        failures = [c for c in post_checks if c["status"] == "FAIL"]
        warnings = [c for c in post_checks if c["status"] == "WARN"]
        
        if len(failures) > 0:
            return {"action": "ROLLBACK", "reason": f"{len(failures)} checks failed", "failures": failures}
        elif len(warnings) > 2:
            return {"action": "INVESTIGATE", "reason": f"{len(warnings)} warnings", "warnings": warnings}
        else:
            return {"action": "PROCEED", "reason": "All checks passed"}

validator = DeploymentValidator()

pre = validator.pre_deploy_checks()
print("Pre-Deploy Checks:")
for c in pre:
    print(f"  [{c['status']}] {c['name']}: {c['detail']}")

post = validator.post_deploy_checks()
print(f"\nPost-Deploy Checks:")
for c in post:
    print(f"  [{c['status']}] {c['name']}: {c['detail']}")

dq = validator.data_quality_tests()
print(f"\nData Quality: {sum(1 for t in dq if t['result'] == 'PASS')}/{len(dq)} passed")

decision = validator.rollback_decision(post)
print(f"\nDecision: {decision['action']} ??? {decision['reason']}")

Monitoring ????????? Rollback

??????????????????????????? rollback ?????????????????????????????????

#!/usr/bin/env python3
# deploy_monitor.py ??? Deployment Monitoring
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class DeploymentMonitor:
    def __init__(self):
        pass
    
    def dashboard(self):
        return {
            "current_deployment": {
                "version": "v2.15.0",
                "deployed_at": "2024-06-15 14:30",
                "status": "healthy",
                "components": {
                    "trino": {"version": "440", "replicas": "5/5", "status": "OK"},
                    "spark": {"version": "3.5.1", "workers": "10/10", "status": "OK"},
                    "airflow": {"version": "2.8.1", "scheduler": "running", "status": "OK"},
                    "dbt": {"version": "1.7.0", "last_run": "success", "status": "OK"},
                    "hive_metastore": {"version": "3.1.3", "status": "OK"},
                },
            },
            "deployment_history": [
                {"version": "v2.15.0", "date": "2024-06-15", "status": "success", "duration": "12min"},
                {"version": "v2.14.2", "date": "2024-06-12", "status": "success", "duration": "8min"},
                {"version": "v2.14.1", "date": "2024-06-10", "status": "rolled_back", "duration": "5min", "reason": "Query latency spike"},
                {"version": "v2.14.0", "date": "2024-06-08", "status": "success", "duration": "15min"},
            ],
            "metrics_comparison": {
                "query_latency_p99": {"before": "2.3s", "after": "2.1s", "change": "-8.7%"},
                "etl_duration": {"before": "45min", "after": "42min", "change": "-6.7%"},
                "error_rate": {"before": "0.02%", "after": "0.02%", "change": "0%"},
                "data_freshness": {"before": "5min", "after": "5min", "change": "0%"},
            },
        }

monitor = DeploymentMonitor()
dash = monitor.dashboard()
deploy = dash["current_deployment"]
print(f"Deployment Dashboard:")
print(f"  Version: {deploy['version']} ({deploy['deployed_at']})")
for comp, info in deploy["components"].items():
    print(f"  {comp}: {info['status']}")

print(f"\nMetrics (before ??? after):")
for metric, data in dash["metrics_comparison"].items():
    print(f"  {metric}: {data['before']} ??? {data['after']} ({data['change']})")

print(f"\nRecent Deployments:")
for d in dash["deployment_history"][:3]:
    print(f"  {d['version']} ({d['date']}): {d['status']} ({d['duration']})")

FAQ ??????????????????????????????????????????

Q: Schema change ?????? Data Lakehouse ?????? zero downtime ???????????????????????????????

A: ????????? ?????????????????? open table formats (Delta Lake, Iceberg, Hudi) ??????????????????????????? schema evolution ??????????????? column ?????????????????????????????????????????? existing queries ???????????? data type ????????? (INT???BIGINT) Iceberg ?????????????????? partition evolution ????????????????????? partitioning strategy ?????????????????? rewrite data ?????????????????????????????????????????????????????? zero downtime ?????? column ????????? consumers ?????????????????? ????????????????????????????????? column ??????????????? references ????????????????????? multi-step migration (add???copy???deprecate???drop) ????????????????????? 1-4 ????????????????????? ??????????????? ????????? schema change ???????????? backward compatible ???????????? deploy

Q: ETL pipeline deploy ????????????????????? job ????????????????????????????????????????????????????????????????

A: ???????????????????????? Graceful shutdown ?????? current job ??????????????????????????? deploy (set terminationGracePeriodSeconds ???????????????), Checkpoint-based Spark Structured Streaming ????????? checkpoints resume ???????????????????????????????????????????????? deploy version ???????????????????????? resume, Idempotent design ?????????????????? ETL ????????? run ???????????????????????????????????????????????? (upsert ????????? insert) ????????? deploy ???????????? job fail ?????????????????? rerun ?????????, Blue-green pipelines run pipeline ???????????? parallel ????????????????????? validate results ???????????? switch ?????????????????? batch ETL (Airflow) deploy ????????????????????? batch ?????????????????? (???????????? deploy ??????????????????????????? batch ?????????????????? 2 AM)

Q: Delta Lake ????????? Iceberg ??????????????????????????????????????????????????? zero downtime?

A: ??????????????????????????????????????? schema evolution ?????? Delta Lake ??????????????? Spark integration ???????????????????????? (Databricks), MERGE INTO ?????????????????? upserts, Time Travel, Z-Ordering ????????????????????? Spark-centric multi-engine support ???????????????????????????????????? Iceberg Apache Iceberg ??????????????? Multi-engine support ???????????????????????? (Spark, Trino, Flink, Dremio), Partition Evolution ????????????????????? partitioning ??????????????????????????? rewrite, Hidden partitioning users ?????????????????????????????? partition structure, Schema evolution ?????????????????? Delta ?????????????????? zero downtime Iceberg ???????????????????????????????????????????????????????????? partition evolution ?????????????????????????????? rewrite data ??????????????? Iceberg ?????????????????? multi-engine, Delta Lake ?????????????????? Databricks ecosystem

Q: Rollback data platform deployment ??????????????????????

A: ?????????????????? component Application code (Kubernetes) ????????? kubectl rollout undo ?????????????????????, Schema changes ???????????? forward-fix (?????????????????? rollback) ???????????? ???????????????????????? column ????????????????????????????????? ????????? fix ???????????????????????? column ??????????????? consumers ?????????????????????????????????????????????, dbt models ????????? dbt run --full-refresh ????????? version ????????????, Table data ????????? Time Travel (Delta Lake: RESTORE TABLE, Iceberg: rollback-to-snapshot) ????????? data ???????????? state ????????????????????? ??????????????? ??????????????? rollback procedure ??????????????????????????? ??????????????????????????? deploy test ???????????? rollback test ???????????? ???????????? deployment history ????????? artifact ????????? version

📖 บทความที่เกี่ยวข้อง

C# Minimal API Zero Downtime Deploymentอ่านบทความ → Java Quarkus Zero Downtime Deploymentอ่านบทความ → A/B Testing ML Zero Downtime Deploymentอ่านบทความ → Databricks Unity Catalog Zero Downtime Deploymentอ่านบทความ → OpenAPI Swagger Zero Downtime Deploymentอ่านบทความ →

📚 ดูบทความทั้งหมด →