SiamCafe.net Blog
Technology

Soda Data Quality Post-mortem Analysis

soda data quality post mortem analysis
Soda Data Quality Post-mortem Analysis | SiamCafe Blog
2026-04-22· อ. บอม — SiamCafe.net· 1,405 คำ

Soda Data Quality Post-mortem Analysis คืออะไร

Soda เป็น open source data quality platform สำหรับตรวจสอบคุณภาพข้อมูลในทุกขั้นตอนของ data pipeline รองรับ SQL-based checks, automated monitoring และ integration กับ dbt, Airflow และ cloud data warehouses Post-mortem Analysis คือการวิเคราะห์หลังเกิดเหตุการณ์ data quality incident เพื่อหา root cause ป้องกันไม่ให้เกิดซ้ำ และปรับปรุงกระบวนการ การรวม Soda กับ post-mortem workflow ช่วยให้ทีม data มีระบบตรวจจับ วิเคราะห์ และแก้ไขปัญหาคุณภาพข้อมูลอย่างเป็นระบบ

Soda Core พื้นฐาน

# soda_basics.py — Soda data quality basics
import json

class SodaBasics:
    COMPONENTS = {
        "soda_core": {
            "name": "Soda Core (Open Source)",
            "description": "CLI tool สำหรับรัน data quality checks",
            "install": "pip install soda-core-postgres soda-core-bigquery",
        },
        "soda_cloud": {
            "name": "Soda Cloud (SaaS)",
            "description": "Dashboard, alerting, collaboration, anomaly detection",
            "install": "สมัคร cloud.soda.io + connect Soda Core",
        },
        "soda_checks": {
            "name": "SodaCL (Soda Checks Language)",
            "description": "YAML-based DSL สำหรับเขียน data quality checks",
            "install": "เขียนใน checks.yml files",
        },
    }

    CHECK_EXAMPLES = """
# checks.yml — Soda data quality checks
checks for orders:
  # Row count
  - row_count > 0
  - row_count between 1000 and 100000

  # Freshness
  - freshness(created_at) < 1d

  # Completeness
  - missing_count(customer_id) = 0
  - missing_percent(email) < 5%

  # Validity
  - invalid_count(status) = 0:
      valid values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']

  # Uniqueness
  - duplicate_count(order_id) = 0

  # Schema
  - schema:
      fail:
        when required column missing: [order_id, customer_id, amount, status]
        when wrong type:
          order_id: integer
          amount: decimal

  # Custom SQL
  - total_amount_check:
      total_amount = SUM(amount):
        fail: when > 1000000

checks for customers:
  - row_count > 0
  - missing_count(email) = 0
  - duplicate_count(email) = 0
  - invalid_percent(phone) < 10%:
      valid regex: '^0[0-9]{8,9}$'
"""

    def show_components(self):
        print("=== Soda Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print(f"  Install: {comp['install']}")
            print()

    def show_checks(self):
        print("=== SodaCL Check Examples ===")
        print(self.CHECK_EXAMPLES[:600])

soda = SodaBasics()
soda.show_components()
soda.show_checks()

Post-mortem Framework

# postmortem.py — Data quality post-mortem framework
import json
from datetime import datetime

class PostMortemFramework:
    TEMPLATE = {
        "header": {
            "title": "Data Quality Incident Post-mortem",
            "fields": ["Incident ID", "Date", "Severity (P1-P4)", "Duration", "Impact", "Owner"],
        },
        "timeline": {
            "title": "Timeline of Events",
            "fields": ["Detection time", "Acknowledgment", "Investigation start", "Root cause identified", "Fix deployed", "Verification", "Incident closed"],
        },
        "impact": {
            "title": "Impact Assessment",
            "fields": ["Affected datasets", "Affected downstream consumers", "Business impact (revenue, decisions)", "Data rows affected", "Duration of bad data"],
        },
        "root_cause": {
            "title": "Root Cause Analysis (5 Whys)",
            "fields": ["Why 1: What happened?", "Why 2: Why did it happen?", "Why 3: Why wasn't it caught earlier?", "Why 4: Why didn't monitoring detect it?", "Why 5: What systemic issue allowed this?"],
        },
        "action_items": {
            "title": "Action Items",
            "fields": ["Immediate fix", "Short-term prevention", "Long-term improvement", "New Soda checks to add", "Process changes"],
        },
    }

    EXAMPLE = {
        "id": "DQ-2025-042",
        "date": "2025-01-15",
        "severity": "P2",
        "title": "Duplicate orders in analytics database",
        "duration": "6 hours (02:00 - 08:00)",
        "impact": "Revenue dashboard overstated by 15%, 3 departments affected",
        "root_cause": "ETL job retry logic created duplicate inserts without deduplication",
        "fix": "Added MERGE/UPSERT instead of INSERT, added Soda duplicate check",
    }

    def show_template(self):
        print("=== Post-mortem Template ===\n")
        for key, section in self.TEMPLATE.items():
            print(f"[{section['title']}]")
            for field in section["fields"][:4]:
                print(f"  □ {field}")
            print()

    def show_example(self):
        print("=== Example Post-mortem ===")
        for key, value in self.EXAMPLE.items():
            print(f"  {key}: {value}")

pm = PostMortemFramework()
pm.show_template()
pm.show_example()

Automated Incident Detection

# detection.py — Automated data quality incident detection
import json
import random

class IncidentDetection:
    SODA_PIPELINE = """
# soda_scan.py — Automated Soda scan in pipeline
from soda.scan import Scan
import json

def run_soda_scan(checks_file, datasource_file):
    scan = Scan()
    scan.set_data_source_name("production_db")
    scan.add_configuration_yaml_file(datasource_file)
    scan.add_sodacl_yaml_file(checks_file)
    scan.execute()
    
    results = scan.get_scan_results()
    
    # Check for failures
    failures = [r for r in results.get('checks', []) if r['outcome'] == 'fail']
    warnings = [r for r in results.get('checks', []) if r['outcome'] == 'warn']
    
    if failures:
        severity = 'P1' if any('row_count' in f['check'] for f in failures) else 'P2'
        create_incident({
            'severity': severity,
            'failures': len(failures),
            'details': [f['check'] for f in failures],
            'timestamp': results.get('scan_time'),
        })
    
    return {
        'passed': len([r for r in results.get('checks', []) if r['outcome'] == 'pass']),
        'failed': len(failures),
        'warnings': len(warnings),
    }

def create_incident(data):
    # Send to PagerDuty / Slack / Jira
    print(f"INCIDENT [{data['severity']}]: {data['failures']} checks failed")
    for detail in data['details'][:5]:
        print(f"  - {detail}")

# Run scan
result = run_soda_scan('checks.yml', 'configuration.yml')
print(f"Results: {result}")
"""

    AIRFLOW_DAG = """
# dags/soda_quality_check.py — Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def soda_check(**context):
    from soda.scan import Scan
    scan = Scan()
    scan.set_data_source_name("warehouse")
    scan.add_configuration_yaml_file("/opt/soda/configuration.yml")
    scan.add_sodacl_yaml_file("/opt/soda/checks.yml")
    scan.execute()
    
    if scan.has_check_fails():
        raise Exception(f"Soda checks failed: {scan.get_checks_fail_text()}")

with DAG('soda_quality_check', schedule_interval='@hourly',
         start_date=datetime(2025, 1, 1)) as dag:
    
    check = PythonOperator(
        task_id='run_soda_checks',
        python_callable=soda_check,
    )
"""

    def show_pipeline(self):
        print("=== Soda Scan Pipeline ===")
        print(self.SODA_PIPELINE[:600])

    def show_airflow(self):
        print(f"\n=== Airflow Integration ===")
        print(self.AIRFLOW_DAG[:400])

detect = IncidentDetection()
detect.show_pipeline()
detect.show_airflow()

Root Cause Analysis

# rca.py — Root cause analysis tools
import json
import random

class RootCauseAnalysis:
    COMMON_CAUSES = {
        "schema_change": {
            "cause": "Schema change ไม่มีการแจ้ง",
            "detection": "Soda schema check fails",
            "prevention": "Schema registry + breaking change alerts",
        },
        "duplicate_data": {
            "cause": "ETL retry สร้าง duplicates",
            "detection": "Soda duplicate_count check",
            "prevention": "Idempotent ETL + MERGE/UPSERT",
        },
        "missing_data": {
            "cause": "Source system downtime / API failure",
            "detection": "Soda row_count + freshness check",
            "prevention": "Retry logic + source monitoring",
        },
        "wrong_values": {
            "cause": "Business logic error ใน transformation",
            "detection": "Soda custom SQL checks + anomaly detection",
            "prevention": "Unit tests สำหรับ transformations + dbt tests",
        },
        "late_data": {
            "cause": "Pipeline delay / dependency failure",
            "detection": "Soda freshness check",
            "prevention": "Pipeline SLAs + dependency monitoring",
        },
    }

    def show_causes(self):
        print("=== Common Root Causes ===\n")
        for key, cause in self.COMMON_CAUSES.items():
            print(f"[{cause['cause']}]")
            print(f"  Detection: {cause['detection']}")
            print(f"  Prevention: {cause['prevention']}")
            print()

    def five_whys(self):
        print("=== 5 Whys Example ===")
        whys = [
            "Why 1: Revenue dashboard แสดงยอดผิด → duplicate orders ใน analytics table",
            "Why 2: ทำไม่มี duplicates → ETL job retry ทำ INSERT ซ้ำ",
            "Why 3: ทำไม retry สร้าง duplicates → ไม่มี deduplication logic",
            "Why 4: ทำไมไม่มี deduplication → ไม่ได้ review ETL retry behavior",
            "Why 5: ทำไมไม่ review → ไม่มี data quality testing ใน CI/CD",
        ]
        for why in whys:
            print(f"  {why}")
        print(f"\n  Root Cause: ไม่มี data quality testing ใน development workflow")

rca = RootCauseAnalysis()
rca.show_causes()
rca.five_whys()

Prevention & Monitoring

# prevention.py — Preventive measures
import json
import random

class Prevention:
    CHECKLIST = {
        "proactive": [
            "เขียน Soda checks สำหรับทุก critical tables",
            "รัน checks ทุกครั้งหลัง ETL job",
            "ตั้ง freshness checks สำหรับ time-sensitive data",
            "Monitor schema changes อัตโนมัติ",
            "สร้าง data contracts กับ upstream teams",
        ],
        "reactive": [
            "สร้าง incident response playbook",
            "ตั้ง PagerDuty/Slack alerts สำหรับ P1/P2",
            "เก็บ post-mortem ทุกครั้ง",
            "Review action items ใน retrospective",
            "Track MTTD (Mean Time to Detect) และ MTTR",
        ],
    }

    def show_checklist(self):
        print("=== Prevention Checklist ===\n")
        for category, items in self.CHECKLIST.items():
            print(f"[{category.upper()}]")
            for item in items[:4]:
                print(f"  □ {item}")
            print()

    def metrics(self):
        print("=== Data Quality Metrics ===")
        metrics = {
            "Data Quality Score": f"{random.randint(85, 98)}%",
            "Checks passed": f"{random.randint(95, 99)}%",
            "Incidents this month": random.randint(0, 5),
            "MTTD (detect)": f"{random.randint(5, 30)} minutes",
            "MTTR (resolve)": f"{random.randint(30, 240)} minutes",
            "Post-mortems completed": f"{random.randint(2, 8)}/{random.randint(2, 8)}",
        }
        for m, v in metrics.items():
            print(f"  {m}: {v}")

prev = Prevention()
prev.show_checklist()
prev.metrics()

FAQ - คำถามที่พบบ่อย

Q: Soda กับ Great Expectations อันไหนดีกว่า?

A: Soda: YAML-based (SodaCL), เรียนรู้ง่าย, Soda Cloud dashboard Great Expectations: Python-based, flexible กว่า, community ใหญ่กว่า ใช้ Soda: เมื่อทีมไม่ใช่ Python developers, ต้องการ quick setup ใช้ GX: เมื่อต้องการ custom checks ซับซ้อน, Python team

Q: ทำ post-mortem ทุก incident ไหม?

A: P1-P2 (Critical/High): ทุกครั้ง blameless post-mortem P3 (Medium): ถ้ามี pattern หรือกระทบหลาย consumers P4 (Low): log ไว้ review ใน retrospective ไม่ต้องทำทุกอัน แต่ต้องมีระบบ track

Q: Soda checks ควรรันบ่อยแค่ไหน?

A: After every ETL run: critical checks (row count, freshness, duplicates) Hourly: anomaly detection, business metric checks Daily: comprehensive full scan Weekly: schema validation, cross-table consistency ยิ่ง data สำคัญ ยิ่งรันบ่อย

Q: Blameless post-mortem คืออะไร?

A: Post-mortem ที่ไม่โทษใคร focus ที่ระบบและ process ไม่ใช่คน ถาม "ทำไมระบบ allow สิ่งนี้เกิดขึ้น" ไม่ใช่ "ใครทำผิด" เป้าหมาย: ปรับปรุง process, เพิ่ม checks, automation ทำให้ทีมกล้า report issues → พบปัญหาเร็วขึ้น

📖 บทความที่เกี่ยวข้อง

Soda Data Quality Identity Access Managementอ่านบทความ → Soda Data Quality Scaling Strategy วิธี Scaleอ่านบทความ → Soda Data Quality Home Lab Setupอ่านบทความ → Soda Data Quality DevSecOps Integrationอ่านบทความ → Soda Data Quality Event Driven Designอ่านบทความ →

📚 ดูบทความทั้งหมด →