SiamCafe · Blog
Soda Data Quality Disaster Recovery Plan —
บทความ

Soda Data Quality Disaster Recovery Plan —

เผยแพร่ 28 พฤษภาคม 2569

Soda Data Quality DR

Soda Data Quality Disaster Recovery Plan —

Soda Data Quality Disaster Recovery SodaCL Checks Automated Testing Alert Pipeline Recovery Strategy Backup Post-mortem Production Operations

PhaseActionToolSLAOwner
DetectionSoda Check ตรวจพบ AnomalySoda + SodaCL< 5 minData Pipeline
Alertแจ้งเตือน Slack PagerDutySoda Cloud< 1 minAutomated
Containmentหยุด Pipeline ป้องกันแพร่Airflow Circuit Breaker< 10 minData Engineer
Assessmentวิเคราะห์ขอบเขตความเสียหายSQL + Soda Report< 30 minData Engineer
Recoveryกู้ข้อมูลจาก Backup/Re-processBackup + Pipeline< 2 hoursData Team
Validationตรวจสอบข้อมูลหลังกู้Soda Full Scan< 30 minData Engineer

SodaCL Checks

=== SodaCL Data Quality Checks ===

อ่านเพิ่ม: LXC vs Docker เลือก Container Technology อะไรดี · อ่านเพิ่ม: GCP Cloud Spanner Disaster Recovery Plan — คู่มือฉบับสมบูรณ์ · อ่านเพิ่ม: Ansible Automation สำหรับมือใหม่ จัดการ Server อัตโนมัติ

checks/orders.yml

checks for orders:

  • row_count > 0:

name: Orders table not empty

  • row_count > 100:

name: Minimum daily orders

  • missing_count(customer_id) = 0:

name: No null customer IDs

  • missing_count(total_amount) = 0:

name: No null order amounts

  • duplicate_count(order_id) = 0:

name: No duplicate order IDs

  • invalid_count(status) = 0:

name: Valid order status

valid values: [pending, processing, shipped, delivered, cancelled]

  • avg(total_amount) between 500 and 5000:

name: Average order amount in range

  • freshness(created_at) < 2h:

name: Orders data is fresh

  • schema:

name: Schema unchanged

fail:

when required column missing: [order_id, customer_id, total_amount, status]

when wrong type:

order_id: integer

total_amount: decimal

checks/customers.yml

checks for customers:

  • row_count > 0
  • missing_count(email) = 0
  • duplicate_count(email) = 0
  • invalid_count(email):

name: Valid email format

valid regex: '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

fail: when > 10

Run scan:

soda scan -d postgres -c configuration.yml checks/orders.yml

soda scan -d bigquery -c configuration.yml checks/

from dataclasses import dataclass

@dataclass

class SodaCheck:

table: str

check: str

severity: str

dr_action: str

checks = [

SodaCheck("orders", "row_count > 0", "CRITICAL", "Stop pipeline, check source system"),

SodaCheck("orders", "freshness(created_at) < 2h", "CRITICAL", "Check ETL job, restart if needed"),

SodaCheck("orders", "duplicate_count(order_id) = 0", "WARNING", "Investigate source, deduplicate"),

SodaCheck("orders", "missing_count(total_amount) = 0", "WARNING", "Backfill from source"),

SodaCheck("customers", "duplicate_count(email) = 0", "WARNING", "Merge duplicates"),

SodaCheck("products", "schema check", "CRITICAL", "Rollback migration, fix schema"),

SodaCheck("analytics", "anomaly_detection(revenue)", "WARNING", "Investigate data source"),

]

print("=== Data Quality Checks ===")

for c in checks:

print(f" [{c.table}] {c.check}")

print(f" Severity: {c.severity} | DR Action: {c.dr_action}")

Recovery Procedures

Soda Data Quality Disaster Recovery Plan —
# === Disaster Recovery Procedures ===

@dataclass
class DRProcedure:
    scenario: str
    detection: str
    containment: str
    recovery: str
    rto: str
    rpo: str

procedures = [
    DRProcedure("Data source down",
        "freshness check fails, row_count = 0",
        "Stop dependent pipelines, use cached data",
        "Wait for source recovery, backfill missing data",
        "4 hours", "2 hours"),
    DRProcedure("Schema change broke pipeline",
        "schema check fails, ETL errors",
        "Rollback migration, pause pipeline",
        "Fix schema mapping, re-run affected jobs",
        "2 hours", "0 (no data loss)"),
    DRProcedure("Duplicate data ingested",
        "duplicate_count > 0, anomaly in metrics",
        "Stop ingestion, mark affected data",
        "Deduplicate, re-process downstream",
        "4 hours", "1 hour"),
    DRProcedure("Data corruption",
        "anomaly_detection triggers, business reports wrong",
        "Quarantine corrupted partitions",
        "Restore from backup, re-run ETL",
        "6 hours", "24 hours (last backup)"),
    DRProcedure("Warehouse outage",
        "All checks fail, connection errors",
        "Switch to read replica if available",
        "Wait for provider recovery, validate data",
        "Provider SLA", "0 (data in source)"),
]

print("=== DR Procedures ===")
for p in procedures:
    print(f"  [{p.scenario}]")
    print(f"    Detect: {p.detection}")
    print(f"    Contain: {p.containment}")
    print(f"    Recover: {p.recovery}")
    print(f"    RTO: {p.rto} | RPO: {p.rpo}")

# Backup Strategy
backup = {
    "Daily Full Backup": "BigQuery snapshot every 24h, retain 30 days",
    "Hourly Incremental": "WAL-based incremental for PostgreSQL",
    "Point-in-time Recovery": "Enable PITR on PostgreSQL, 7 day retention",
    "Cross-region Replica": "BigQuery dataset replica in another region",
    "Source System Replay": "Keep source event log for 90 days re-processing",
}

print(f"\n\nBackup Strategy:")
for k, v in backup.items():
    print(f"  [{k}]: {v}")

Pipeline Integration

=== Airflow + Soda Integration ===

from airflow import DAG

from airflow.operators.python import PythonOperator

from soda.scan import Scan

def run_soda_check(table, checks_file):

scan = Scan()

scan.set_data_source_name("postgres")

scan.add_configuration_yaml_file("configuration.yml")

scan.add_sodacl_yaml_file(checks_file)

scan.execute()

if scan.has_check_fails():

critical = [c for c in scan.get_checks_fail() if c.severity == "critical"]

if critical:

raise Exception(f"CRITICAL data quality failure: {critical}")

else:

send_slack_warning(scan.get_checks_fail())

with DAG("etl_with_quality") as dag:

extract = PythonOperator(task_id="extract", python_callable=extract_data)

transform = PythonOperator(task_id="transform", python_callable=transform_data)

quality = PythonOperator(task_id="quality_check",

python_callable=run_soda_check,

op_args=["orders", "checks/orders.yml"])

load = PythonOperator(task_id="load", python_callable=load_data)

extract >> transform >> quality >> load

Circuit Breaker Pattern

quality check fail → stop pipeline → alert → manual review → resume

@dataclass

class PipelineStep:

step: str

quality_gate: str

on_fail: str

pipeline = [

PipelineStep("Extract from source", "Source freshness + row count", "Retry 3x then alert"),

PipelineStep("Transform data", "Schema validation + null checks", "Stop pipeline, alert"),

PipelineStep("Quality gate (Soda)", "Full SodaCL scan", "Block load, quarantine data"),

PipelineStep("Load to warehouse", "Post-load row count match", "Rollback load, restore backup"),

PipelineStep("Update dashboards", "Dashboard metric range check", "Revert dashboard, investigate"),

]

print("Pipeline with Quality Gates:")

for p in pipeline:

print(f" [{p.step}]")

print(f" Gate: {p.quality_gate}")

print(f" On fail: {p.on_fail}")

เคล็ดลับ

  • Freshness: ตรวจ Freshness ทุกตาราง ป้องกันข้อมูลค้าง
  • Circuit Breaker: หยุด Pipeline ทันทีเมื่อ Critical Check Fail
  • Backup: ทดสอบ Restore Backup ทุกเดือน
  • Runbook: เขียน DR Runbook สำหรับทุก Scenario
  • Drill: ซ้อม DR Drill ทุกไตรมาส ทดสอบ Recovery จริง

Soda Data Quality คืออะไร

Open Source Data Quality SodaCL Checks row_count missing duplicate freshness schema PostgreSQL BigQuery Snowflake CI/CD Pipeline ETL Automated