Zero Downtime Deployment ?????????????????? Data Lakehouse ?????????????????????
Data Lakehouse ???????????? architecture ?????????????????? Data Lake ????????? Data Warehouse ????????? open table formats ???????????? Delta Lake, Apache Iceberg, Apache Hudi Zero Downtime Deployment ?????????????????????????????? deploy changes (schema, code, infrastructure) ????????????????????????????????? data consumers ??????????????????????????????????????????
??????????????????????????????????????? Data Lakehouse Schema changes ???????????? backward compatible ????????? existing queries, ETL pipelines ??????????????????????????????????????????????????????????????????????????? deployment, Query engines (Trino, Spark) ???????????? serve requests ?????????????????????, Metadata services (Hive Metastore, Unity Catalog) ???????????? available, Data quality ???????????? consistent ????????????????????????????????? deploy
Components ????????????????????? deploy ?????????????????? downtime ETL/ELT pipelines (Spark jobs, dbt models), Query engines (Trino workers, Spark Thrift Server), Metadata services (Hive Metastore, Iceberg Catalog), Storage layer (table format upgrades), Infrastructure (Kubernetes, networking)
????????????????????? Zero Downtime ?????????????????? Data Platform
Deployment strategies ?????????????????? data platform components
# === Zero Downtime Strategies ===
# 1. Blue-Green Deployment for Query Engine
cat > blue-green-trino.yaml << 'EOF'
apiVersion: v1
kind: Service
metadata:
name: trino-gateway
namespace: data-platform
spec:
selector:
app: trino
version: blue # Switch to 'green' during deployment
ports:
- port: 8080
targetPort: 8080
---
# Blue cluster (current)
apiVersion: apps/v1
kind: Deployment
metadata:
name: trino-blue
spec:
replicas: 5
selector:
matchLabels:
app: trino
version: blue
template:
metadata:
labels:
app: trino
version: blue
spec:
containers:
- name: trino
image: trinodb/trino:439
resources:
requests:
cpu: "8"
memory: "64Gi"
---
# Green cluster (new version)
apiVersion: apps/v1
kind: Deployment
metadata:
name: trino-green
spec:
replicas: 5
selector:
matchLabels:
app: trino
version: green
template:
metadata:
labels:
app: trino
version: green
spec:
containers:
- name: trino
image: trinodb/trino:440
resources:
requests:
cpu: "8"
memory: "64Gi"
EOF
# 2. Rolling Update for Spark Workers
cat > spark-rolling.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-workers
namespace: data-platform
spec:
replicas: 10
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 1
selector:
matchLabels:
app: spark-worker
template:
spec:
terminationGracePeriodSeconds: 300
containers:
- name: spark
image: spark:3.5.1
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- "sleep 30 && /opt/spark/sbin/stop-worker.sh"
EOF
# 3. Canary for dbt Models
cat > dbt_canary.sh << 'BASH'
#!/bin/bash
# Deploy dbt models with canary strategy
echo "=== dbt Canary Deployment ==="
# Step 1: Run new models to shadow schema
dbt run --target shadow --select new_models
# Step 2: Validate data quality
dbt test --target shadow --select new_models
if [ $? -ne 0 ]; then
echo "FAIL: Data quality tests failed"
exit 1
fi
# Step 3: Compare results
python3 compare_results.py --source production --target shadow
# Step 4: Swap if OK
dbt run --target production --select new_models --full-refresh
echo "Canary deployment complete"
BASH
echo "Zero downtime strategies configured"
Schema Evolution ????????? Migration
?????????????????? schema changes ????????????????????????????????? consumers
#!/usr/bin/env python3
# schema_evolution.py ??? Schema Evolution for Data Lakehouse
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("schema")
class SchemaEvolutionManager:
"""Manage schema changes with zero downtime"""
def __init__(self):
self.migrations = []
def safe_operations(self):
"""Operations that are backward compatible"""
return {
"add_column": {
"safe": True,
"description": "??????????????? column ???????????? (nullable ???????????? ?????? default value)",
"delta_lake": "ALTER TABLE ADD COLUMNS (new_col STRING)",
"iceberg": "ALTER TABLE ADD COLUMN new_col string",
"impact": "Existing queries ???????????????????????? (ignore new column)",
},
"widen_type": {
"safe": True,
"description": "???????????? data type (INT ??? BIGINT, FLOAT ??? DOUBLE)",
"delta_lake": "Schema evolution: mergeSchema option",
"iceberg": "ALTER TABLE ALTER COLUMN col TYPE bigint",
"impact": "Existing data ????????? read ?????????",
},
"add_partition": {
"safe": True,
"description": "??????????????? partition field",
"iceberg": "ALTER TABLE ADD PARTITION FIELD year(ts)",
"impact": "Old partitions ????????????????????? new data ????????? partition ????????????",
},
}
def unsafe_operations(self):
"""Operations that need migration strategy"""
return {
"drop_column": {
"safe": False,
"description": "?????? column",
"strategy": "Deprecate first ??? Stop using ??? Drop after grace period",
"steps": [
"1. Mark column as deprecated (metadata/comment)",
"2. Update all consumers to not use the column",
"3. Wait grace period (7-30 days)",
"4. Drop column",
],
},
"rename_column": {
"safe": False,
"description": "????????????????????????????????? column",
"strategy": "Add new ??? Copy data ??? Deprecate old ??? Drop old",
"steps": [
"1. Add new column with new name",
"2. Backfill data from old column",
"3. Update ETL to write both columns",
"4. Update consumers to use new column",
"5. Drop old column after grace period",
],
},
"change_type_narrowing": {
"safe": False,
"description": "?????? data type (BIGINT ??? INT, STRING ??? INT)",
"strategy": "Create new column ??? Validate ??? Swap",
"risk": "Data loss if values don't fit new type",
},
}
def migration_plan(self, changes):
"""Generate migration plan"""
safe = self.safe_operations()
unsafe = self.unsafe_operations()
plan = {"safe_changes": [], "unsafe_changes": [], "estimated_risk": "LOW"}
for change in changes:
op = change["operation"]
if op in safe:
plan["safe_changes"].append({**change, "strategy": "Direct apply"})
elif op in unsafe:
plan["unsafe_changes"].append({**change, "strategy": unsafe[op]["strategy"]})
plan["estimated_risk"] = "HIGH"
return plan
manager = SchemaEvolutionManager()
plan = manager.migration_plan([
{"operation": "add_column", "table": "orders", "column": "discount_pct", "type": "DOUBLE"},
{"operation": "rename_column", "table": "users", "old": "fname", "new": "first_name"},
{"operation": "widen_type", "table": "products", "column": "price", "from": "FLOAT", "to": "DOUBLE"},
])
print(f"Migration Plan (Risk: {plan['estimated_risk']}):")
print(f" Safe changes: {len(plan['safe_changes'])}")
for c in plan["safe_changes"]:
print(f" - {c['operation']}: {c['table']}.{c.get('column', c.get('old', ''))}")
print(f" Unsafe changes: {len(plan['unsafe_changes'])}")
for c in plan["unsafe_changes"]:
print(f" - {c['operation']}: {c['table']} ??? {c['strategy']}")
Rolling Deployment ?????????????????? Data Services
Deploy data services ????????? rolling update
# === Rolling Deployment Pipeline ===
cat > deploy_pipeline.sh << 'BASH'
#!/bin/bash
# Zero Downtime Deployment Pipeline for Data Lakehouse
set -e
SERVICE=$1
VERSION=$2
NAMESPACE="data-platform"
echo "=== Deploying $SERVICE v$VERSION ==="
# Step 1: Pre-deployment checks
echo "[1/6] Pre-deployment validation..."
kubectl get pods -n $NAMESPACE -l app=$SERVICE --field-selector status.phase=Running | grep -c Running
if [ $? -ne 0 ]; then
echo "FAIL: Current deployment unhealthy"
exit 1
fi
# Step 2: Run database migrations (if any)
echo "[2/6] Running schema migrations..."
# dbt run --target production --select migrations
# python3 migrate.py --check-compatible
# Step 3: Deploy with rolling update
echo "[3/6] Rolling deployment..."
kubectl set image deployment/$SERVICE \
$SERVICE=/: \
-n $NAMESPACE
# Step 4: Wait for rollout
echo "[4/6] Waiting for rollout..."
kubectl rollout status deployment/$SERVICE -n $NAMESPACE --timeout=300s
if [ $? -ne 0 ]; then
echo "FAIL: Rollout timeout, initiating rollback"
kubectl rollout undo deployment/$SERVICE -n $NAMESPACE
exit 1
fi
# Step 5: Post-deployment validation
echo "[5/6] Post-deployment validation..."
sleep 30
# Check health endpoint
HEALTH=$(kubectl exec -n $NAMESPACE deploy/$SERVICE -- curl -s http://localhost:8080/health)
if echo "$HEALTH" | grep -q "healthy"; then
echo "Health check: PASS"
else
echo "FAIL: Health check failed, rolling back"
kubectl rollout undo deployment/$SERVICE -n $NAMESPACE
exit 1
fi
# Run data quality tests
# dbt test --target production --select critical_tests
# Step 6: Notify
echo "[6/6] Deployment complete"
curl -X POST "https://hooks.slack.com/services/xxx" \
-d "{\"text\":\"Deployed $SERVICE v$VERSION successfully\"}"
BASH
chmod +x deploy_pipeline.sh
# GitHub Actions workflow
cat > .github/workflows/deploy-data-platform.yml << 'EOF'
name: Deploy Data Platform
on:
push:
branches: [main]
paths:
- 'data-platform/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run dbt tests
run: |
pip install dbt-core dbt-trino
dbt test --target ci
deploy:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: ./deploy_pipeline.sh trino-worker }
- name: Smoke test
run: python3 smoke_test.py --env staging
- name: Deploy to production
run: ./deploy_pipeline.sh trino-worker }
EOF
echo "Deployment pipeline configured"
Automated Testing ????????? Validation
??????????????????????????????????????????????????????????????????????????? deployment
#!/usr/bin/env python3
# deploy_validator.py ??? Deployment Validation for Data Lakehouse
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("validator")
class DeploymentValidator:
"""Validate data platform deployments"""
def __init__(self):
self.results = []
def pre_deploy_checks(self):
"""Checks before deployment"""
checks = [
{"name": "Schema compatibility", "status": "PASS", "detail": "No breaking changes detected"},
{"name": "ETL pipeline status", "status": "PASS", "detail": "All 15 pipelines healthy"},
{"name": "Query engine health", "status": "PASS", "detail": "5/5 Trino workers running"},
{"name": "Pending queries", "status": "WARN", "detail": "3 long-running queries (> 10min)"},
{"name": "Storage capacity", "status": "PASS", "detail": "S3: 2.1TB/5TB used (42%)"},
{"name": "Backup status", "status": "PASS", "detail": "Last backup: 2h ago"},
]
return checks
def post_deploy_checks(self):
"""Checks after deployment"""
checks = [
{"name": "Service health", "status": "PASS", "detail": "All endpoints responding"},
{"name": "Query latency", "status": "PASS", "detail": "P99: 2.1s (baseline: 2.3s)"},
{"name": "Data freshness", "status": "PASS", "detail": "Latest data: 5 min ago"},
{"name": "Row count validation", "status": "PASS", "detail": "Orders: 1.2M (expected: 1.2M +/- 1%)"},
{"name": "Schema validation", "status": "PASS", "detail": "All tables match expected schema"},
{"name": "dbt tests", "status": "PASS", "detail": "45/45 tests passed"},
]
return checks
def data_quality_tests(self):
"""Run data quality validation"""
tests = [
{"test": "Not null: orders.order_id", "result": "PASS", "null_count": 0},
{"test": "Unique: orders.order_id", "result": "PASS", "duplicates": 0},
{"test": "Range: orders.amount > 0", "result": "PASS", "violations": 0},
{"test": "Referential: orders.user_id in users", "result": "PASS", "orphans": 0},
{"test": "Freshness: orders.created_at < 1h", "result": "PASS", "max_age_min": 12},
{"test": "Volume: orders today > 1000", "result": "PASS", "count": 15230},
]
return tests
def rollback_decision(self, post_checks):
"""Decide if rollback is needed"""
failures = [c for c in post_checks if c["status"] == "FAIL"]
warnings = [c for c in post_checks if c["status"] == "WARN"]
if len(failures) > 0:
return {"action": "ROLLBACK", "reason": f"{len(failures)} checks failed", "failures": failures}
elif len(warnings) > 2:
return {"action": "INVESTIGATE", "reason": f"{len(warnings)} warnings", "warnings": warnings}
else:
return {"action": "PROCEED", "reason": "All checks passed"}
validator = DeploymentValidator()
pre = validator.pre_deploy_checks()
print("Pre-Deploy Checks:")
for c in pre:
print(f" [{c['status']}] {c['name']}: {c['detail']}")
post = validator.post_deploy_checks()
print(f"\nPost-Deploy Checks:")
for c in post:
print(f" [{c['status']}] {c['name']}: {c['detail']}")
dq = validator.data_quality_tests()
print(f"\nData Quality: {sum(1 for t in dq if t['result'] == 'PASS')}/{len(dq)} passed")
decision = validator.rollback_decision(post)
print(f"\nDecision: {decision['action']} ??? {decision['reason']}")
Monitoring ????????? Rollback
??????????????????????????? rollback ?????????????????????????????????
#!/usr/bin/env python3
# deploy_monitor.py ??? Deployment Monitoring
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")
class DeploymentMonitor:
def __init__(self):
pass
def dashboard(self):
return {
"current_deployment": {
"version": "v2.15.0",
"deployed_at": "2024-06-15 14:30",
"status": "healthy",
"components": {
"trino": {"version": "440", "replicas": "5/5", "status": "OK"},
"spark": {"version": "3.5.1", "workers": "10/10", "status": "OK"},
"airflow": {"version": "2.8.1", "scheduler": "running", "status": "OK"},
"dbt": {"version": "1.7.0", "last_run": "success", "status": "OK"},
"hive_metastore": {"version": "3.1.3", "status": "OK"},
},
},
"deployment_history": [
{"version": "v2.15.0", "date": "2024-06-15", "status": "success", "duration": "12min"},
{"version": "v2.14.2", "date": "2024-06-12", "status": "success", "duration": "8min"},
{"version": "v2.14.1", "date": "2024-06-10", "status": "rolled_back", "duration": "5min", "reason": "Query latency spike"},
{"version": "v2.14.0", "date": "2024-06-08", "status": "success", "duration": "15min"},
],
"metrics_comparison": {
"query_latency_p99": {"before": "2.3s", "after": "2.1s", "change": "-8.7%"},
"etl_duration": {"before": "45min", "after": "42min", "change": "-6.7%"},
"error_rate": {"before": "0.02%", "after": "0.02%", "change": "0%"},
"data_freshness": {"before": "5min", "after": "5min", "change": "0%"},
},
}
monitor = DeploymentMonitor()
dash = monitor.dashboard()
deploy = dash["current_deployment"]
print(f"Deployment Dashboard:")
print(f" Version: {deploy['version']} ({deploy['deployed_at']})")
for comp, info in deploy["components"].items():
print(f" {comp}: {info['status']}")
print(f"\nMetrics (before ??? after):")
for metric, data in dash["metrics_comparison"].items():
print(f" {metric}: {data['before']} ??? {data['after']} ({data['change']})")
print(f"\nRecent Deployments:")
for d in dash["deployment_history"][:3]:
print(f" {d['version']} ({d['date']}): {d['status']} ({d['duration']})")
FAQ ??????????????????????????????????????????
Q: Schema change ?????? Data Lakehouse ?????? zero downtime ???????????????????????????????
A: ????????? ?????????????????? open table formats (Delta Lake, Iceberg, Hudi) ??????????????????????????? schema evolution ??????????????? column ?????????????????????????????????????????? existing queries ???????????? data type ????????? (INT???BIGINT) Iceberg ?????????????????? partition evolution ????????????????????? partitioning strategy ?????????????????? rewrite data ?????????????????????????????????????????????????????? zero downtime ?????? column ????????? consumers ?????????????????? ????????????????????????????????? column ??????????????? references ????????????????????? multi-step migration (add???copy???deprecate???drop) ????????????????????? 1-4 ????????????????????? ??????????????? ????????? schema change ???????????? backward compatible ???????????? deploy
Q: ETL pipeline deploy ????????????????????? job ????????????????????????????????????????????????????????????????
A: ???????????????????????? Graceful shutdown ?????? current job ??????????????????????????? deploy (set terminationGracePeriodSeconds ???????????????), Checkpoint-based Spark Structured Streaming ????????? checkpoints resume ???????????????????????????????????????????????? deploy version ???????????????????????? resume, Idempotent design ?????????????????? ETL ????????? run ???????????????????????????????????????????????? (upsert ????????? insert) ????????? deploy ???????????? job fail ?????????????????? rerun ?????????, Blue-green pipelines run pipeline ???????????? parallel ????????????????????? validate results ???????????? switch ?????????????????? batch ETL (Airflow) deploy ????????????????????? batch ?????????????????? (???????????? deploy ??????????????????????????? batch ?????????????????? 2 AM)
Q: Delta Lake ????????? Iceberg ??????????????????????????????????????????????????? zero downtime?
A: ??????????????????????????????????????? schema evolution ?????? Delta Lake ??????????????? Spark integration ???????????????????????? (Databricks), MERGE INTO ?????????????????? upserts, Time Travel, Z-Ordering ????????????????????? Spark-centric multi-engine support ???????????????????????????????????? Iceberg Apache Iceberg ??????????????? Multi-engine support ???????????????????????? (Spark, Trino, Flink, Dremio), Partition Evolution ????????????????????? partitioning ??????????????????????????? rewrite, Hidden partitioning users ?????????????????????????????? partition structure, Schema evolution ?????????????????? Delta ?????????????????? zero downtime Iceberg ???????????????????????????????????????????????????????????? partition evolution ?????????????????????????????? rewrite data ??????????????? Iceberg ?????????????????? multi-engine, Delta Lake ?????????????????? Databricks ecosystem
Q: Rollback data platform deployment ??????????????????????
A: ?????????????????? component Application code (Kubernetes) ????????? kubectl rollout undo ?????????????????????, Schema changes ???????????? forward-fix (?????????????????? rollback) ???????????? ???????????????????????? column ????????????????????????????????? ????????? fix ???????????????????????? column ??????????????? consumers ?????????????????????????????????????????????, dbt models ????????? dbt run --full-refresh ????????? version ????????????, Table data ????????? Time Travel (Delta Lake: RESTORE TABLE, Iceberg: rollback-to-snapshot) ????????? data ???????????? state ????????????????????? ??????????????? ??????????????? rollback procedure ??????????????????????????? ??????????????????????????? deploy test ???????????? rollback test ???????????? ???????????? deployment history ????????? artifact ????????? version
