
Soda Data Quality Disaster Recovery Plan —
Soda Data Quality DR

Soda Data Quality Disaster Recovery SodaCL Checks Automated Testing Alert Pipeline Recovery Strategy Backup Post-mortem Production Operations
| Phase | Action | Tool | SLA | Owner |
|---|---|---|---|---|
| Detection | Soda Check ตรวจพบ Anomaly | Soda + SodaCL | < 5 min | Data Pipeline |
| Alert | แจ้งเตือน Slack PagerDuty | Soda Cloud | < 1 min | Automated |
| Containment | หยุด Pipeline ป้องกันแพร่ | Airflow Circuit Breaker | < 10 min | Data Engineer |
| Assessment | วิเคราะห์ขอบเขตความเสียหาย | SQL + Soda Report | < 30 min | Data Engineer |
| Recovery | กู้ข้อมูลจาก Backup/Re-process | Backup + Pipeline | < 2 hours | Data Team |
| Validation | ตรวจสอบข้อมูลหลังกู้ | Soda Full Scan | < 30 min | Data Engineer |
SodaCL Checks
=== SodaCL Data Quality Checks ===
อ่านเพิ่ม: LXC vs Docker เลือก Container Technology อะไรดี · อ่านเพิ่ม: GCP Cloud Spanner Disaster Recovery Plan — คู่มือฉบับสมบูรณ์ · อ่านเพิ่ม: Ansible Automation สำหรับมือใหม่ จัดการ Server อัตโนมัติ
checks/orders.yml
checks for orders:
- row_count > 0:
name: Orders table not empty
- row_count > 100:
name: Minimum daily orders
- missing_count(customer_id) = 0:
name: No null customer IDs
- missing_count(total_amount) = 0:
name: No null order amounts
- duplicate_count(order_id) = 0:
name: No duplicate order IDs
- invalid_count(status) = 0:
name: Valid order status
valid values: [pending, processing, shipped, delivered, cancelled]
- avg(total_amount) between 500 and 5000:
name: Average order amount in range
- freshness(created_at) < 2h:
name: Orders data is fresh
- schema:
name: Schema unchanged
fail:
when required column missing: [order_id, customer_id, total_amount, status]
when wrong type:
order_id: integer
total_amount: decimal
checks/customers.yml
checks for customers:
- row_count > 0
- missing_count(email) = 0
- duplicate_count(email) = 0
- invalid_count(email):
name: Valid email format
valid regex: '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
fail: when > 10
Run scan:
soda scan -d postgres -c configuration.yml checks/orders.yml
soda scan -d bigquery -c configuration.yml checks/
from dataclasses import dataclass
@dataclass
class SodaCheck:
table: str
check: str
severity: str
dr_action: str
checks = [
SodaCheck("orders", "row_count > 0", "CRITICAL", "Stop pipeline, check source system"),
SodaCheck("orders", "freshness(created_at) < 2h", "CRITICAL", "Check ETL job, restart if needed"),
SodaCheck("orders", "duplicate_count(order_id) = 0", "WARNING", "Investigate source, deduplicate"),
SodaCheck("orders", "missing_count(total_amount) = 0", "WARNING", "Backfill from source"),
SodaCheck("customers", "duplicate_count(email) = 0", "WARNING", "Merge duplicates"),
SodaCheck("products", "schema check", "CRITICAL", "Rollback migration, fix schema"),
SodaCheck("analytics", "anomaly_detection(revenue)", "WARNING", "Investigate data source"),
]
print("=== Data Quality Checks ===")
for c in checks:
print(f" [{c.table}] {c.check}")
print(f" Severity: {c.severity} | DR Action: {c.dr_action}")
Recovery Procedures

# === Disaster Recovery Procedures ===
@dataclass
class DRProcedure:
scenario: str
detection: str
containment: str
recovery: str
rto: str
rpo: str
procedures = [
DRProcedure("Data source down",
"freshness check fails, row_count = 0",
"Stop dependent pipelines, use cached data",
"Wait for source recovery, backfill missing data",
"4 hours", "2 hours"),
DRProcedure("Schema change broke pipeline",
"schema check fails, ETL errors",
"Rollback migration, pause pipeline",
"Fix schema mapping, re-run affected jobs",
"2 hours", "0 (no data loss)"),
DRProcedure("Duplicate data ingested",
"duplicate_count > 0, anomaly in metrics",
"Stop ingestion, mark affected data",
"Deduplicate, re-process downstream",
"4 hours", "1 hour"),
DRProcedure("Data corruption",
"anomaly_detection triggers, business reports wrong",
"Quarantine corrupted partitions",
"Restore from backup, re-run ETL",
"6 hours", "24 hours (last backup)"),
DRProcedure("Warehouse outage",
"All checks fail, connection errors",
"Switch to read replica if available",
"Wait for provider recovery, validate data",
"Provider SLA", "0 (data in source)"),
]
print("=== DR Procedures ===")
for p in procedures:
print(f" [{p.scenario}]")
print(f" Detect: {p.detection}")
print(f" Contain: {p.containment}")
print(f" Recover: {p.recovery}")
print(f" RTO: {p.rto} | RPO: {p.rpo}")
# Backup Strategy
backup = {
"Daily Full Backup": "BigQuery snapshot every 24h, retain 30 days",
"Hourly Incremental": "WAL-based incremental for PostgreSQL",
"Point-in-time Recovery": "Enable PITR on PostgreSQL, 7 day retention",
"Cross-region Replica": "BigQuery dataset replica in another region",
"Source System Replay": "Keep source event log for 90 days re-processing",
}
print(f"\n\nBackup Strategy:")
for k, v in backup.items():
print(f" [{k}]: {v}")
Pipeline Integration
=== Airflow + Soda Integration ===
from airflow import DAG
from airflow.operators.python import PythonOperator
from soda.scan import Scan
def run_soda_check(table, checks_file):
scan = Scan()
scan.set_data_source_name("postgres")
scan.add_configuration_yaml_file("configuration.yml")
scan.add_sodacl_yaml_file(checks_file)
scan.execute()
if scan.has_check_fails():
critical = [c for c in scan.get_checks_fail() if c.severity == "critical"]
if critical:
raise Exception(f"CRITICAL data quality failure: {critical}")
else:
send_slack_warning(scan.get_checks_fail())
with DAG("etl_with_quality") as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_data)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
quality = PythonOperator(task_id="quality_check",
python_callable=run_soda_check,
op_args=["orders", "checks/orders.yml"])
load = PythonOperator(task_id="load", python_callable=load_data)
extract >> transform >> quality >> load
Circuit Breaker Pattern
quality check fail → stop pipeline → alert → manual review → resume
@dataclass
class PipelineStep:
step: str
quality_gate: str
on_fail: str
pipeline = [
PipelineStep("Extract from source", "Source freshness + row count", "Retry 3x then alert"),
PipelineStep("Transform data", "Schema validation + null checks", "Stop pipeline, alert"),
PipelineStep("Quality gate (Soda)", "Full SodaCL scan", "Block load, quarantine data"),
PipelineStep("Load to warehouse", "Post-load row count match", "Rollback load, restore backup"),
PipelineStep("Update dashboards", "Dashboard metric range check", "Revert dashboard, investigate"),
]
print("Pipeline with Quality Gates:")
for p in pipeline:
print(f" [{p.step}]")
print(f" Gate: {p.quality_gate}")
print(f" On fail: {p.on_fail}")
เคล็ดลับ
- Freshness: ตรวจ Freshness ทุกตาราง ป้องกันข้อมูลค้าง
- Circuit Breaker: หยุด Pipeline ทันทีเมื่อ Critical Check Fail
- Backup: ทดสอบ Restore Backup ทุกเดือน
- Runbook: เขียน DR Runbook สำหรับทุก Scenario
- Drill: ซ้อม DR Drill ทุกไตรมาส ทดสอบ Recovery จริง
Soda Data Quality คืออะไร
Open Source Data Quality SodaCL Checks row_count missing duplicate freshness schema PostgreSQL BigQuery Snowflake CI/CD Pipeline ETL Automated