SiamCafe.net Blog
Technology

Soda Data Quality Post-mortem Analysis

soda data quality post mortem analysis
Soda Data Quality Post-mortem Analysis | SiamCafe Blog
2026-04-22· อ. บอม — SiamCafe.net· 1,405 คำ

Soda Data Quality Post-mortem Analysis คืออะไร

Soda เป็น open source data quality platform สำหรับตรวจสอบคุณภาพข้อมูลในทุกขั้นตอนของ data pipeline รองรับ SQL-based checks, automated monitoring และ integration กับ dbt, Airflow และ cloud data warehouses Post-mortem Analysis คือการวิเคราะห์หลังเกิดเหตุการณ์ data quality incident เพื่อหา root cause ป้องกันไม่ให้เกิดซ้ำ และปรับปรุงกระบวนการ การรวม Soda กับ post-mortem workflow ช่วยให้ทีม data มีระบบตรวจจับ วิเคราะห์ และแก้ไขปัญหาคุณภาพข้อมูลอย่างเป็นระบบ

Soda Core พื้นฐาน

# soda_basics.py — Soda data quality basics
import json

class SodaBasics:
 COMPONENTS = {
 "soda_core": {
 "name": "Soda Core (Open Source)",
 "description": "CLI tool สำหรับรัน data quality checks",
 "install": "pip install soda-core-postgres soda-core-bigquery",
 },
 "soda_cloud": {
 "name": "Soda Cloud (SaaS)",
 "description": "Dashboard, alerting, collaboration, anomaly detection",
 "install": "สมัคร cloud.soda.io + connect Soda Core",
 },
 "soda_checks": {
 "name": "SodaCL (Soda Checks Language)",
 "description": "YAML-based DSL สำหรับเขียน data quality checks",
 "install": "เขียนใน checks.yml files",
 },
 }

 CHECK_EXAMPLES = """
# checks.yml — Soda data quality checks
checks for orders:
 # Row count
 - row_count > 0
 - row_count between 1000 and 100000

 # Freshness
 - freshness(created_at) < 1d

 # Completeness
 - missing_count(customer_id) = 0
 - missing_percent(email) < 5%

 # Validity
 - invalid_count(status) = 0:
 valid values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']

 # Uniqueness
 - duplicate_count(order_id) = 0

 # Schema
 - schema:
 fail:
 when required column missing: [order_id, customer_id, amount, status]
 when wrong type:
 order_id: integer
 amount: decimal

 # Custom SQL
 - total_amount_check:
 total_amount = SUM(amount):
 fail: when > 1000000

checks for customers:
 - row_count > 0
 - missing_count(email) = 0
 - duplicate_count(email) = 0
 - invalid_percent(phone) < 10%:
 valid regex: '^0[0-9]{8,9}$'
"""

 def show_components(self):
 print("=== Soda Components ===\n")
 for key, comp in self.COMPONENTS.items():
 print(f"[{comp['name']}]")
 print(f" {comp['description']}")
 print(f" Install: {comp['install']}")
 print()

 def show_checks(self):
 print("=== SodaCL Check Examples ===")
 print(self.CHECK_EXAMPLES[:600])

soda = SodaBasics()
soda.show_components()
soda.show_checks()

Post-mortem Framework

# postmortem.py — Data quality post-mortem framework
import json
from datetime import datetime

class PostMortemFramework:
 TEMPLATE = {
 "header": {
 "title": "Data Quality Incident Post-mortem",
 "fields": ["Incident ID", "Date", "Severity (P1-P4)", "Duration", "Impact", "Owner"],
 },
 "timeline": {
 "title": "Timeline of Events",
 "fields": ["Detection time", "Acknowledgment", "Investigation start", "Root cause identified", "Fix deployed", "Verification", "Incident closed"],
 },
 "impact": {
 "title": "Impact Assessment",
 "fields": ["Affected datasets", "Affected downstream consumers", "Business impact (revenue, decisions)", "Data rows affected", "Duration of bad data"],
 },
 "root_cause": {
 "title": "Root Cause Analysis (5 Whys)",
 "fields": ["Why 1: What happened?", "Why 2: Why did it happen?", "Why 3: Why wasn't it caught earlier?", "Why 4: Why didn't monitoring detect it?", "Why 5: What systemic issue allowed this?"],
 },
 "action_items": {
 "title": "Action Items",
 "fields": ["Immediate fix", "Short-term prevention", "Long-term improvement", "New Soda checks to add", "Process changes"],
 },
 }

 EXAMPLE = {
 "id": "DQ-2025-042",
 "date": "2025-01-15",
 "severity": "P2",
 "title": "Duplicate orders in analytics database",
 "duration": "6 hours (02:00 - 08:00)",
 "impact": "Revenue dashboard overstated by 15%, 3 departments affected",
 "root_cause": "ETL job retry logic created duplicate inserts without deduplication",
 "fix": "Added MERGE/UPSERT instead of INSERT, added Soda duplicate check",
 }

 def show_template(self):
 print("=== Post-mortem Template ===\n")
 for key, section in self.TEMPLATE.items():
 print(f"[{section['title']}]")
 for field in section["fields"][:4]:
 print(f" □ {field}")
 print()

 def show_example(self):
 print("=== Example Post-mortem ===")
 for key, value in self.EXAMPLE.items():
 print(f" {key}: {value}")

pm = PostMortemFramework()
pm.show_template()
pm.show_example()

Automated Incident Detection

# detection.py — Automated data quality incident detection
import json
import random

class IncidentDetection:
 SODA_PIPELINE = """
# soda_scan.py — Automated Soda scan in pipeline
from soda.scan import Scan
import json

def run_soda_scan(checks_file, datasource_file):
 scan = Scan()
 scan.set_data_source_name("production_db")
 scan.add_configuration_yaml_file(datasource_file)
 scan.add_sodacl_yaml_file(checks_file)
 scan.execute()
 
 results = scan.get_scan_results()
 
 # Check for failures
 failures = [r for r in results.get('checks', []) if r['outcome'] == 'fail']
 warnings = [r for r in results.get('checks', []) if r['outcome'] == 'warn']
 
 if failures:
 severity = 'P1' if any('row_count' in f['check'] for f in failures) else 'P2'
 create_incident({
 'severity': severity,
 'failures': len(failures),
 'details': [f['check'] for f in failures],
 'timestamp': results.get('scan_time'),
 })
 
 return {
 'passed': len([r for r in results.get('checks', []) if r['outcome'] == 'pass']),
 'failed': len(failures),
 'warnings': len(warnings),
 }

def create_incident(data):
 # Send to PagerDuty / Slack / Jira
 print(f"INCIDENT [{data['severity']}]: {data['failures']} checks failed")
 for detail in data['details'][:5]:
 print(f" - {detail}")

# Run scan
result = run_soda_scan('checks.yml', 'configuration.yml')
print(f"Results: {result}")
"""

 AIRFLOW_DAG = """
# dags/soda_quality_check.py — Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def soda_check(**context):
 from soda.scan import Scan
 scan = Scan()
 scan.set_data_source_name("warehouse")
 scan.add_configuration_yaml_file("/opt/soda/configuration.yml")
 scan.add_sodacl_yaml_file("/opt/soda/checks.yml")
 scan.execute()
 
 if scan.has_check_fails():
 raise Exception(f"Soda checks failed: {scan.get_checks_fail_text()}")

with DAG('soda_quality_check', schedule_interval='@hourly',
 start_date=datetime(2025, 1, 1)) as dag:
 
 check = PythonOperator(
 task_id='run_soda_checks',
 python_callable=soda_check,
 )
"""

 def show_pipeline(self):
 print("=== Soda Scan Pipeline ===")
 print(self.SODA_PIPELINE[:600])

 def show_airflow(self):
 print(f"\n=== Airflow Integration ===")
 print(self.AIRFLOW_DAG[:400])

detect = IncidentDetection()
detect.show_pipeline()
detect.show_airflow()

Root Cause Analysis

# rca.py — Root cause analysis tools
import json
import random

class RootCauseAnalysis:
 COMMON_CAUSES = {
 "schema_change": {
 "cause": "Schema change ไม่มีการแจ้ง",
 "detection": "Soda schema check fails",
 "prevention": "Schema registry + breaking change alerts",
 },
 "duplicate_data": {
 "cause": "ETL retry สร้าง duplicates",
 "detection": "Soda duplicate_count check",
 "prevention": "Idempotent ETL + MERGE/UPSERT",
 },
 "missing_data": {
 "cause": "Source system downtime / API failure",
 "detection": "Soda row_count + freshness check",
 "prevention": "Retry logic + source monitoring",
 },
 "wrong_values": {
 "cause": "Business logic error ใน transformation",
 "detection": "Soda custom SQL checks + anomaly detection",
 "prevention": "Unit tests สำหรับ transformations + dbt tests",
 },
 "late_data": {
 "cause": "Pipeline delay / dependency failure",
 "detection": "Soda freshness check",
 "prevention": "Pipeline SLAs + dependency monitoring",
 },
 }

 def show_causes(self):
 print("=== Common Root Causes ===\n")
 for key, cause in self.COMMON_CAUSES.items():
 print(f"[{cause['cause']}]")
 print(f" Detection: {cause['detection']}")
 print(f" Prevention: {cause['prevention']}")
 print()

 def five_whys(self):
 print("=== 5 Whys Example ===")
 whys = [
 "Why 1: Revenue dashboard แสดงยอดผิด → duplicate orders ใน analytics table",
 "Why 2: ทำไม่มี duplicates → ETL job retry ทำ INSERT ซ้ำ",
 "Why 3: ทำไม retry สร้าง duplicates → ไม่มี deduplication logic",
 "Why 4: ทำไมไม่มี deduplication → ไม่ได้ review ETL retry behavior",
 "Why 5: ทำไมไม่ review → ไม่มี data quality testing ใน CI/CD",
 ]
 for why in whys:
 print(f" {why}")
 print(f"\n Root Cause: ไม่มี data quality testing ใน development workflow")

rca = RootCauseAnalysis()
rca.show_causes()
rca.five_whys()

Prevention & Monitoring

# prevention.py — Preventive measures
import json
import random

class Prevention:
 CHECKLIST = {
 "proactive": [
 "เขียน Soda checks สำหรับทุก critical tables",
 "รัน checks ทุกครั้งหลัง ETL job",
 "ตั้ง freshness checks สำหรับ time-sensitive data",
 "Monitor schema changes อัตโนมัติ",
 "สร้าง data contracts กับ upstream teams",
 ],
 "reactive": [
 "สร้าง incident response playbook",
 "ตั้ง PagerDuty/Slack alerts สำหรับ P1/P2",
 "เก็บ post-mortem ทุกครั้ง",
 "Review action items ใน retrospective",
 "Track MTTD (Mean Time to Detect) และ MTTR",
 ],
 }

 def show_checklist(self):
 print("=== Prevention Checklist ===\n")
 for category, items in self.CHECKLIST.items():
 print(f"[{category.upper()}]")
 for item in items[:4]:
 print(f" □ {item}")
 print()

 def metrics(self):
 print("=== Data Quality Metrics ===")
 metrics = {
 "Data Quality Score": f"{random.randint(85, 98)}%",
 "Checks passed": f"{random.randint(95, 99)}%",
 "Incidents this month": random.randint(0, 5),
 "MTTD (detect)": f"{random.randint(5, 30)} minutes",
 "MTTR (resolve)": f"{random.randint(30, 240)} minutes",
 "Post-mortems completed": f"{random.randint(2, 8)}/{random.randint(2, 8)}",
 }
 for m, v in metrics.items():
 print(f" {m}: {v}")

prev = Prevention()
prev.show_checklist()
prev.metrics()

FAQ - คำถามที่พบบ่อย

Q: Soda กับ Great Expectations อันไหนดีกว่า?

A: Soda: YAML-based (SodaCL), เรียนรู้ง่าย, Soda Cloud dashboard Great Expectations: Python-based, flexible กว่า, community ใหญ่กว่า ใช้ Soda: เมื่อทีมไม่ใช่ Python developers, ต้องการ quick setup ใช้ GX: เมื่อต้องการ custom checks ซับซ้อน, Python team

Q: ทำ post-mortem ทุก incident ไหม?

A: P1-P2 (Critical/High): ทุกครั้ง blameless post-mortem P3 (Medium): ถ้ามี pattern หรือกระทบหลาย consumers P4 (Low): log ไว้ review ใน retrospective ไม่ต้องทำทุกอัน แต่ต้องมีระบบ track

Q: Soda checks ควรรันบ่อยแค่ไหน?

A: After every ETL run: critical checks (row count, freshness, duplicates) Hourly: anomaly detection, business metric checks Daily: comprehensive full scan Weekly: schema validation, cross-table consistency ยิ่ง data สำคัญ ยิ่งรันบ่อย

Q: Blameless post-mortem คืออะไร?

A: Post-mortem ที่ไม่โทษใคร focus ที่ระบบและ process ไม่ใช่คน ถาม "ทำไมระบบ allow สิ่งนี้เกิดขึ้น" ไม่ใช่ "ใครทำผิด" เป้าหมาย: ปรับปรุง process, เพิ่ม checks, automation ทำให้ทีมกล้า report issues → พบปัญหาเร็วขึ้น

📖 บทความที่เกี่ยวข้อง

Soda Data Quality Identity Access Managementอ่านบทความ → Soda Data Quality Scaling Strategy วิธี Scaleอ่านบทความ → Soda Data Quality Home Lab Setupอ่านบทความ → Soda Data Quality DevSecOps Integrationอ่านบทความ → Soda Data Quality Event Driven Designอ่านบทความ →

📚 ดูบทความทั้งหมด →