SiamCafe.net Blog
Technology

Soda Data Quality Disaster Recovery Plan

soda data quality disaster recovery plan
Soda Data Quality Disaster Recovery Plan | SiamCafe Blog
2025-06-01· อ. บอม — SiamCafe.net· 8,510 คำ

Soda Data Quality DR

Soda Data Quality Disaster Recovery SodaCL Checks Automated Testing Alert Pipeline Recovery Strategy Backup Post-mortem Production Operations

PhaseActionToolSLAOwner
DetectionSoda Check ตรวจพบ AnomalySoda + SodaCL< 5 minData Pipeline
Alertแจ้งเตือน Slack PagerDutySoda Cloud< 1 minAutomated
Containmentหยุด Pipeline ป้องกันแพร่Airflow Circuit Breaker< 10 minData Engineer
Assessmentวิเคราะห์ขอบเขตความเสียหายSQL + Soda Report< 30 minData Engineer
Recoveryกู้ข้อมูลจาก Backup/Re-processBackup + Pipeline< 2 hoursData Team
Validationตรวจสอบข้อมูลหลังกู้Soda Full Scan< 30 minData Engineer

SodaCL Checks

# === SodaCL Data Quality Checks ===

# checks/orders.yml
# checks for orders:
#   - row_count > 0:
#       name: Orders table not empty
#   - row_count > 100:
#       name: Minimum daily orders
#   - missing_count(customer_id) = 0:
#       name: No null customer IDs
#   - missing_count(total_amount) = 0:
#       name: No null order amounts
#   - duplicate_count(order_id) = 0:
#       name: No duplicate order IDs
#   - invalid_count(status) = 0:
#       name: Valid order status
#       valid values: [pending, processing, shipped, delivered, cancelled]
#   - avg(total_amount) between 500 and 5000:
#       name: Average order amount in range
#   - freshness(created_at) < 2h:
#       name: Orders data is fresh
#   - schema:
#       name: Schema unchanged
#       fail:
#         when required column missing: [order_id, customer_id, total_amount, status]
#         when wrong type:
#           order_id: integer
#           total_amount: decimal

# checks/customers.yml
# checks for customers:
#   - row_count > 0
#   - missing_count(email) = 0
#   - duplicate_count(email) = 0
#   - invalid_count(email):
#       name: Valid email format
#       valid regex: '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
#       fail: when > 10

# Run scan:
# soda scan -d postgres -c configuration.yml checks/orders.yml
# soda scan -d bigquery -c configuration.yml checks/

from dataclasses import dataclass

@dataclass
class SodaCheck:
    table: str
    check: str
    severity: str
    dr_action: str

checks = [
    SodaCheck("orders", "row_count > 0", "CRITICAL", "Stop pipeline, check source system"),
    SodaCheck("orders", "freshness(created_at) < 2h", "CRITICAL", "Check ETL job, restart if needed"),
    SodaCheck("orders", "duplicate_count(order_id) = 0", "WARNING", "Investigate source, deduplicate"),
    SodaCheck("orders", "missing_count(total_amount) = 0", "WARNING", "Backfill from source"),
    SodaCheck("customers", "duplicate_count(email) = 0", "WARNING", "Merge duplicates"),
    SodaCheck("products", "schema check", "CRITICAL", "Rollback migration, fix schema"),
    SodaCheck("analytics", "anomaly_detection(revenue)", "WARNING", "Investigate data source"),
]

print("=== Data Quality Checks ===")
for c in checks:
    print(f"  [{c.table}] {c.check}")
    print(f"    Severity: {c.severity} | DR Action: {c.dr_action}")

Recovery Procedures

# === Disaster Recovery Procedures ===

@dataclass
class DRProcedure:
    scenario: str
    detection: str
    containment: str
    recovery: str
    rto: str
    rpo: str

procedures = [
    DRProcedure("Data source down",
        "freshness check fails, row_count = 0",
        "Stop dependent pipelines, use cached data",
        "Wait for source recovery, backfill missing data",
        "4 hours", "2 hours"),
    DRProcedure("Schema change broke pipeline",
        "schema check fails, ETL errors",
        "Rollback migration, pause pipeline",
        "Fix schema mapping, re-run affected jobs",
        "2 hours", "0 (no data loss)"),
    DRProcedure("Duplicate data ingested",
        "duplicate_count > 0, anomaly in metrics",
        "Stop ingestion, mark affected data",
        "Deduplicate, re-process downstream",
        "4 hours", "1 hour"),
    DRProcedure("Data corruption",
        "anomaly_detection triggers, business reports wrong",
        "Quarantine corrupted partitions",
        "Restore from backup, re-run ETL",
        "6 hours", "24 hours (last backup)"),
    DRProcedure("Warehouse outage",
        "All checks fail, connection errors",
        "Switch to read replica if available",
        "Wait for provider recovery, validate data",
        "Provider SLA", "0 (data in source)"),
]

print("=== DR Procedures ===")
for p in procedures:
    print(f"  [{p.scenario}]")
    print(f"    Detect: {p.detection}")
    print(f"    Contain: {p.containment}")
    print(f"    Recover: {p.recovery}")
    print(f"    RTO: {p.rto} | RPO: {p.rpo}")

# Backup Strategy
backup = {
    "Daily Full Backup": "BigQuery snapshot every 24h, retain 30 days",
    "Hourly Incremental": "WAL-based incremental for PostgreSQL",
    "Point-in-time Recovery": "Enable PITR on PostgreSQL, 7 day retention",
    "Cross-region Replica": "BigQuery dataset replica in another region",
    "Source System Replay": "Keep source event log for 90 days re-processing",
}

print(f"\n\nBackup Strategy:")
for k, v in backup.items():
    print(f"  [{k}]: {v}")

Pipeline Integration

# === Airflow + Soda Integration ===

# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from soda.scan import Scan
#
# def run_soda_check(table, checks_file):
#     scan = Scan()
#     scan.set_data_source_name("postgres")
#     scan.add_configuration_yaml_file("configuration.yml")
#     scan.add_sodacl_yaml_file(checks_file)
#     scan.execute()
#
#     if scan.has_check_fails():
#         critical = [c for c in scan.get_checks_fail() if c.severity == "critical"]
#         if critical:
#             raise Exception(f"CRITICAL data quality failure: {critical}")
#         else:
#             send_slack_warning(scan.get_checks_fail())
#
# with DAG("etl_with_quality") as dag:
#     extract = PythonOperator(task_id="extract", python_callable=extract_data)
#     transform = PythonOperator(task_id="transform", python_callable=transform_data)
#     quality = PythonOperator(task_id="quality_check",
#         python_callable=run_soda_check,
#         op_args=["orders", "checks/orders.yml"])
#     load = PythonOperator(task_id="load", python_callable=load_data)
#
#     extract >> transform >> quality >> load

# Circuit Breaker Pattern
# quality check fail → stop pipeline → alert → manual review → resume

@dataclass
class PipelineStep:
    step: str
    quality_gate: str
    on_fail: str

pipeline = [
    PipelineStep("Extract from source", "Source freshness + row count", "Retry 3x then alert"),
    PipelineStep("Transform data", "Schema validation + null checks", "Stop pipeline, alert"),
    PipelineStep("Quality gate (Soda)", "Full SodaCL scan", "Block load, quarantine data"),
    PipelineStep("Load to warehouse", "Post-load row count match", "Rollback load, restore backup"),
    PipelineStep("Update dashboards", "Dashboard metric range check", "Revert dashboard, investigate"),
]

print("Pipeline with Quality Gates:")
for p in pipeline:
    print(f"  [{p.step}]")
    print(f"    Gate: {p.quality_gate}")
    print(f"    On fail: {p.on_fail}")

เคล็ดลับ

Soda Data Quality คืออะไร

Open Source Data Quality SodaCL Checks row_count missing duplicate freshness schema PostgreSQL BigQuery Snowflake CI/CD Pipeline ETL Automated

DR Plan สำหรับ Data Quality คืออะไร

Detection Soda Checks Alerting Slack PagerDuty Containment Stop Pipeline Recovery Backup Re-process Post-mortem สาเหตุ ป้องกัน RTO RPO

เขียน SodaCL Check อย่างไร

checks.yml row_count missing_count duplicate_count values_in freshness schema anomaly_detection soda scan Soda Cloud Dashboard Severity

ตั้ง Alert Pipeline อย่างไร

Slack Alert Check Fail Severity CRITICAL WARNING INFO Stop Pipeline Webhook PagerDuty Escalation Policy 15 นาที Consecutive Fail Dashboard

สรุป

Soda Data Quality Disaster Recovery SodaCL Checks Freshness Schema Alert Pipeline Circuit Breaker Backup Recovery Airflow Integration Post-mortem Production

📖 บทความที่เกี่ยวข้อง

Soda Data Quality Scaling Strategy วิธี Scaleอ่านบทความ → DALL-E API Disaster Recovery Planอ่านบทความ → Soda Data Quality Identity Access Managementอ่านบทความ → Docker Multi-stage Build Disaster Recovery Planอ่านบทความ → Soda Data Quality API Gateway Patternอ่านบทความ →

📚 ดูบทความทั้งหมด →