Soda Data Quality DR
Soda Data Quality Disaster Recovery SodaCL Checks Automated Testing Alert Pipeline Recovery Strategy Backup Post-mortem Production Operations
| Phase | Action | Tool | SLA | Owner |
|---|---|---|---|---|
| Detection | Soda Check ตรวจพบ Anomaly | Soda + SodaCL | < 5 min | Data Pipeline |
| Alert | แจ้งเตือน Slack PagerDuty | Soda Cloud | < 1 min | Automated |
| Containment | หยุด Pipeline ป้องกันแพร่ | Airflow Circuit Breaker | < 10 min | Data Engineer |
| Assessment | วิเคราะห์ขอบเขตความเสียหาย | SQL + Soda Report | < 30 min | Data Engineer |
| Recovery | กู้ข้อมูลจาก Backup/Re-process | Backup + Pipeline | < 2 hours | Data Team |
| Validation | ตรวจสอบข้อมูลหลังกู้ | Soda Full Scan | < 30 min | Data Engineer |
SodaCL Checks
# === SodaCL Data Quality Checks ===
# checks/orders.yml
# checks for orders:
# - row_count > 0:
# name: Orders table not empty
# - row_count > 100:
# name: Minimum daily orders
# - missing_count(customer_id) = 0:
# name: No null customer IDs
# - missing_count(total_amount) = 0:
# name: No null order amounts
# - duplicate_count(order_id) = 0:
# name: No duplicate order IDs
# - invalid_count(status) = 0:
# name: Valid order status
# valid values: [pending, processing, shipped, delivered, cancelled]
# - avg(total_amount) between 500 and 5000:
# name: Average order amount in range
# - freshness(created_at) < 2h:
# name: Orders data is fresh
# - schema:
# name: Schema unchanged
# fail:
# when required column missing: [order_id, customer_id, total_amount, status]
# when wrong type:
# order_id: integer
# total_amount: decimal
# checks/customers.yml
# checks for customers:
# - row_count > 0
# - missing_count(email) = 0
# - duplicate_count(email) = 0
# - invalid_count(email):
# name: Valid email format
# valid regex: '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
# fail: when > 10
# Run scan:
# soda scan -d postgres -c configuration.yml checks/orders.yml
# soda scan -d bigquery -c configuration.yml checks/
from dataclasses import dataclass
@dataclass
class SodaCheck:
table: str
check: str
severity: str
dr_action: str
checks = [
SodaCheck("orders", "row_count > 0", "CRITICAL", "Stop pipeline, check source system"),
SodaCheck("orders", "freshness(created_at) < 2h", "CRITICAL", "Check ETL job, restart if needed"),
SodaCheck("orders", "duplicate_count(order_id) = 0", "WARNING", "Investigate source, deduplicate"),
SodaCheck("orders", "missing_count(total_amount) = 0", "WARNING", "Backfill from source"),
SodaCheck("customers", "duplicate_count(email) = 0", "WARNING", "Merge duplicates"),
SodaCheck("products", "schema check", "CRITICAL", "Rollback migration, fix schema"),
SodaCheck("analytics", "anomaly_detection(revenue)", "WARNING", "Investigate data source"),
]
print("=== Data Quality Checks ===")
for c in checks:
print(f" [{c.table}] {c.check}")
print(f" Severity: {c.severity} | DR Action: {c.dr_action}")
Recovery Procedures
# === Disaster Recovery Procedures ===
@dataclass
class DRProcedure:
scenario: str
detection: str
containment: str
recovery: str
rto: str
rpo: str
procedures = [
DRProcedure("Data source down",
"freshness check fails, row_count = 0",
"Stop dependent pipelines, use cached data",
"Wait for source recovery, backfill missing data",
"4 hours", "2 hours"),
DRProcedure("Schema change broke pipeline",
"schema check fails, ETL errors",
"Rollback migration, pause pipeline",
"Fix schema mapping, re-run affected jobs",
"2 hours", "0 (no data loss)"),
DRProcedure("Duplicate data ingested",
"duplicate_count > 0, anomaly in metrics",
"Stop ingestion, mark affected data",
"Deduplicate, re-process downstream",
"4 hours", "1 hour"),
DRProcedure("Data corruption",
"anomaly_detection triggers, business reports wrong",
"Quarantine corrupted partitions",
"Restore from backup, re-run ETL",
"6 hours", "24 hours (last backup)"),
DRProcedure("Warehouse outage",
"All checks fail, connection errors",
"Switch to read replica if available",
"Wait for provider recovery, validate data",
"Provider SLA", "0 (data in source)"),
]
print("=== DR Procedures ===")
for p in procedures:
print(f" [{p.scenario}]")
print(f" Detect: {p.detection}")
print(f" Contain: {p.containment}")
print(f" Recover: {p.recovery}")
print(f" RTO: {p.rto} | RPO: {p.rpo}")
# Backup Strategy
backup = {
"Daily Full Backup": "BigQuery snapshot every 24h, retain 30 days",
"Hourly Incremental": "WAL-based incremental for PostgreSQL",
"Point-in-time Recovery": "Enable PITR on PostgreSQL, 7 day retention",
"Cross-region Replica": "BigQuery dataset replica in another region",
"Source System Replay": "Keep source event log for 90 days re-processing",
}
print(f"\n\nBackup Strategy:")
for k, v in backup.items():
print(f" [{k}]: {v}")
Pipeline Integration
# === Airflow + Soda Integration ===
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from soda.scan import Scan
#
# def run_soda_check(table, checks_file):
# scan = Scan()
# scan.set_data_source_name("postgres")
# scan.add_configuration_yaml_file("configuration.yml")
# scan.add_sodacl_yaml_file(checks_file)
# scan.execute()
#
# if scan.has_check_fails():
# critical = [c for c in scan.get_checks_fail() if c.severity == "critical"]
# if critical:
# raise Exception(f"CRITICAL data quality failure: {critical}")
# else:
# send_slack_warning(scan.get_checks_fail())
#
# with DAG("etl_with_quality") as dag:
# extract = PythonOperator(task_id="extract", python_callable=extract_data)
# transform = PythonOperator(task_id="transform", python_callable=transform_data)
# quality = PythonOperator(task_id="quality_check",
# python_callable=run_soda_check,
# op_args=["orders", "checks/orders.yml"])
# load = PythonOperator(task_id="load", python_callable=load_data)
#
# extract >> transform >> quality >> load
# Circuit Breaker Pattern
# quality check fail → stop pipeline → alert → manual review → resume
@dataclass
class PipelineStep:
step: str
quality_gate: str
on_fail: str
pipeline = [
PipelineStep("Extract from source", "Source freshness + row count", "Retry 3x then alert"),
PipelineStep("Transform data", "Schema validation + null checks", "Stop pipeline, alert"),
PipelineStep("Quality gate (Soda)", "Full SodaCL scan", "Block load, quarantine data"),
PipelineStep("Load to warehouse", "Post-load row count match", "Rollback load, restore backup"),
PipelineStep("Update dashboards", "Dashboard metric range check", "Revert dashboard, investigate"),
]
print("Pipeline with Quality Gates:")
for p in pipeline:
print(f" [{p.step}]")
print(f" Gate: {p.quality_gate}")
print(f" On fail: {p.on_fail}")
เคล็ดลับ
- Freshness: ตรวจ Freshness ทุกตาราง ป้องกันข้อมูลค้าง
- Circuit Breaker: หยุด Pipeline ทันทีเมื่อ Critical Check Fail
- Backup: ทดสอบ Restore Backup ทุกเดือน
- Runbook: เขียน DR Runbook สำหรับทุก Scenario
- Drill: ซ้อม DR Drill ทุกไตรมาส ทดสอบ Recovery จริง
Soda Data Quality คืออะไร
Open Source Data Quality SodaCL Checks row_count missing duplicate freshness schema PostgreSQL BigQuery Snowflake CI/CD Pipeline ETL Automated
DR Plan สำหรับ Data Quality คืออะไร
Detection Soda Checks Alerting Slack PagerDuty Containment Stop Pipeline Recovery Backup Re-process Post-mortem สาเหตุ ป้องกัน RTO RPO
เขียน SodaCL Check อย่างไร
checks.yml row_count missing_count duplicate_count values_in freshness schema anomaly_detection soda scan Soda Cloud Dashboard Severity
ตั้ง Alert Pipeline อย่างไร
Slack Alert Check Fail Severity CRITICAL WARNING INFO Stop Pipeline Webhook PagerDuty Escalation Policy 15 นาที Consecutive Fail Dashboard
สรุป
Soda Data Quality Disaster Recovery SodaCL Checks Freshness Schema Alert Pipeline Circuit Breaker Backup Recovery Airflow Integration Post-mortem Production
