Soda Data Quality Post-mortem Analysis คืออะไร
Soda เป็น open source data quality platform สำหรับตรวจสอบคุณภาพข้อมูลในทุกขั้นตอนของ data pipeline รองรับ SQL-based checks, automated monitoring และ integration กับ dbt, Airflow และ cloud data warehouses Post-mortem Analysis คือการวิเคราะห์หลังเกิดเหตุการณ์ data quality incident เพื่อหา root cause ป้องกันไม่ให้เกิดซ้ำ และปรับปรุงกระบวนการ การรวม Soda กับ post-mortem workflow ช่วยให้ทีม data มีระบบตรวจจับ วิเคราะห์ และแก้ไขปัญหาคุณภาพข้อมูลอย่างเป็นระบบ
Soda Core พื้นฐาน
# soda_basics.py — Soda data quality basics
import json
class SodaBasics:
COMPONENTS = {
"soda_core": {
"name": "Soda Core (Open Source)",
"description": "CLI tool สำหรับรัน data quality checks",
"install": "pip install soda-core-postgres soda-core-bigquery",
},
"soda_cloud": {
"name": "Soda Cloud (SaaS)",
"description": "Dashboard, alerting, collaboration, anomaly detection",
"install": "สมัคร cloud.soda.io + connect Soda Core",
},
"soda_checks": {
"name": "SodaCL (Soda Checks Language)",
"description": "YAML-based DSL สำหรับเขียน data quality checks",
"install": "เขียนใน checks.yml files",
},
}
CHECK_EXAMPLES = """
# checks.yml — Soda data quality checks
checks for orders:
# Row count
- row_count > 0
- row_count between 1000 and 100000
# Freshness
- freshness(created_at) < 1d
# Completeness
- missing_count(customer_id) = 0
- missing_percent(email) < 5%
# Validity
- invalid_count(status) = 0:
valid values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
# Uniqueness
- duplicate_count(order_id) = 0
# Schema
- schema:
fail:
when required column missing: [order_id, customer_id, amount, status]
when wrong type:
order_id: integer
amount: decimal
# Custom SQL
- total_amount_check:
total_amount = SUM(amount):
fail: when > 1000000
checks for customers:
- row_count > 0
- missing_count(email) = 0
- duplicate_count(email) = 0
- invalid_percent(phone) < 10%:
valid regex: '^0[0-9]{8,9}$'
"""
def show_components(self):
print("=== Soda Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print(f" Install: {comp['install']}")
print()
def show_checks(self):
print("=== SodaCL Check Examples ===")
print(self.CHECK_EXAMPLES[:600])
soda = SodaBasics()
soda.show_components()
soda.show_checks()
Post-mortem Framework
# postmortem.py — Data quality post-mortem framework
import json
from datetime import datetime
class PostMortemFramework:
TEMPLATE = {
"header": {
"title": "Data Quality Incident Post-mortem",
"fields": ["Incident ID", "Date", "Severity (P1-P4)", "Duration", "Impact", "Owner"],
},
"timeline": {
"title": "Timeline of Events",
"fields": ["Detection time", "Acknowledgment", "Investigation start", "Root cause identified", "Fix deployed", "Verification", "Incident closed"],
},
"impact": {
"title": "Impact Assessment",
"fields": ["Affected datasets", "Affected downstream consumers", "Business impact (revenue, decisions)", "Data rows affected", "Duration of bad data"],
},
"root_cause": {
"title": "Root Cause Analysis (5 Whys)",
"fields": ["Why 1: What happened?", "Why 2: Why did it happen?", "Why 3: Why wasn't it caught earlier?", "Why 4: Why didn't monitoring detect it?", "Why 5: What systemic issue allowed this?"],
},
"action_items": {
"title": "Action Items",
"fields": ["Immediate fix", "Short-term prevention", "Long-term improvement", "New Soda checks to add", "Process changes"],
},
}
EXAMPLE = {
"id": "DQ-2025-042",
"date": "2025-01-15",
"severity": "P2",
"title": "Duplicate orders in analytics database",
"duration": "6 hours (02:00 - 08:00)",
"impact": "Revenue dashboard overstated by 15%, 3 departments affected",
"root_cause": "ETL job retry logic created duplicate inserts without deduplication",
"fix": "Added MERGE/UPSERT instead of INSERT, added Soda duplicate check",
}
def show_template(self):
print("=== Post-mortem Template ===\n")
for key, section in self.TEMPLATE.items():
print(f"[{section['title']}]")
for field in section["fields"][:4]:
print(f" □ {field}")
print()
def show_example(self):
print("=== Example Post-mortem ===")
for key, value in self.EXAMPLE.items():
print(f" {key}: {value}")
pm = PostMortemFramework()
pm.show_template()
pm.show_example()
Automated Incident Detection
# detection.py — Automated data quality incident detection
import json
import random
class IncidentDetection:
SODA_PIPELINE = """
# soda_scan.py — Automated Soda scan in pipeline
from soda.scan import Scan
import json
def run_soda_scan(checks_file, datasource_file):
scan = Scan()
scan.set_data_source_name("production_db")
scan.add_configuration_yaml_file(datasource_file)
scan.add_sodacl_yaml_file(checks_file)
scan.execute()
results = scan.get_scan_results()
# Check for failures
failures = [r for r in results.get('checks', []) if r['outcome'] == 'fail']
warnings = [r for r in results.get('checks', []) if r['outcome'] == 'warn']
if failures:
severity = 'P1' if any('row_count' in f['check'] for f in failures) else 'P2'
create_incident({
'severity': severity,
'failures': len(failures),
'details': [f['check'] for f in failures],
'timestamp': results.get('scan_time'),
})
return {
'passed': len([r for r in results.get('checks', []) if r['outcome'] == 'pass']),
'failed': len(failures),
'warnings': len(warnings),
}
def create_incident(data):
# Send to PagerDuty / Slack / Jira
print(f"INCIDENT [{data['severity']}]: {data['failures']} checks failed")
for detail in data['details'][:5]:
print(f" - {detail}")
# Run scan
result = run_soda_scan('checks.yml', 'configuration.yml')
print(f"Results: {result}")
"""
AIRFLOW_DAG = """
# dags/soda_quality_check.py — Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def soda_check(**context):
from soda.scan import Scan
scan = Scan()
scan.set_data_source_name("warehouse")
scan.add_configuration_yaml_file("/opt/soda/configuration.yml")
scan.add_sodacl_yaml_file("/opt/soda/checks.yml")
scan.execute()
if scan.has_check_fails():
raise Exception(f"Soda checks failed: {scan.get_checks_fail_text()}")
with DAG('soda_quality_check', schedule_interval='@hourly',
start_date=datetime(2025, 1, 1)) as dag:
check = PythonOperator(
task_id='run_soda_checks',
python_callable=soda_check,
)
"""
def show_pipeline(self):
print("=== Soda Scan Pipeline ===")
print(self.SODA_PIPELINE[:600])
def show_airflow(self):
print(f"\n=== Airflow Integration ===")
print(self.AIRFLOW_DAG[:400])
detect = IncidentDetection()
detect.show_pipeline()
detect.show_airflow()
Root Cause Analysis
# rca.py — Root cause analysis tools
import json
import random
class RootCauseAnalysis:
COMMON_CAUSES = {
"schema_change": {
"cause": "Schema change ไม่มีการแจ้ง",
"detection": "Soda schema check fails",
"prevention": "Schema registry + breaking change alerts",
},
"duplicate_data": {
"cause": "ETL retry สร้าง duplicates",
"detection": "Soda duplicate_count check",
"prevention": "Idempotent ETL + MERGE/UPSERT",
},
"missing_data": {
"cause": "Source system downtime / API failure",
"detection": "Soda row_count + freshness check",
"prevention": "Retry logic + source monitoring",
},
"wrong_values": {
"cause": "Business logic error ใน transformation",
"detection": "Soda custom SQL checks + anomaly detection",
"prevention": "Unit tests สำหรับ transformations + dbt tests",
},
"late_data": {
"cause": "Pipeline delay / dependency failure",
"detection": "Soda freshness check",
"prevention": "Pipeline SLAs + dependency monitoring",
},
}
def show_causes(self):
print("=== Common Root Causes ===\n")
for key, cause in self.COMMON_CAUSES.items():
print(f"[{cause['cause']}]")
print(f" Detection: {cause['detection']}")
print(f" Prevention: {cause['prevention']}")
print()
def five_whys(self):
print("=== 5 Whys Example ===")
whys = [
"Why 1: Revenue dashboard แสดงยอดผิด → duplicate orders ใน analytics table",
"Why 2: ทำไม่มี duplicates → ETL job retry ทำ INSERT ซ้ำ",
"Why 3: ทำไม retry สร้าง duplicates → ไม่มี deduplication logic",
"Why 4: ทำไมไม่มี deduplication → ไม่ได้ review ETL retry behavior",
"Why 5: ทำไมไม่ review → ไม่มี data quality testing ใน CI/CD",
]
for why in whys:
print(f" {why}")
print(f"\n Root Cause: ไม่มี data quality testing ใน development workflow")
rca = RootCauseAnalysis()
rca.show_causes()
rca.five_whys()
Prevention & Monitoring
# prevention.py — Preventive measures
import json
import random
class Prevention:
CHECKLIST = {
"proactive": [
"เขียน Soda checks สำหรับทุก critical tables",
"รัน checks ทุกครั้งหลัง ETL job",
"ตั้ง freshness checks สำหรับ time-sensitive data",
"Monitor schema changes อัตโนมัติ",
"สร้าง data contracts กับ upstream teams",
],
"reactive": [
"สร้าง incident response playbook",
"ตั้ง PagerDuty/Slack alerts สำหรับ P1/P2",
"เก็บ post-mortem ทุกครั้ง",
"Review action items ใน retrospective",
"Track MTTD (Mean Time to Detect) และ MTTR",
],
}
def show_checklist(self):
print("=== Prevention Checklist ===\n")
for category, items in self.CHECKLIST.items():
print(f"[{category.upper()}]")
for item in items[:4]:
print(f" □ {item}")
print()
def metrics(self):
print("=== Data Quality Metrics ===")
metrics = {
"Data Quality Score": f"{random.randint(85, 98)}%",
"Checks passed": f"{random.randint(95, 99)}%",
"Incidents this month": random.randint(0, 5),
"MTTD (detect)": f"{random.randint(5, 30)} minutes",
"MTTR (resolve)": f"{random.randint(30, 240)} minutes",
"Post-mortems completed": f"{random.randint(2, 8)}/{random.randint(2, 8)}",
}
for m, v in metrics.items():
print(f" {m}: {v}")
prev = Prevention()
prev.show_checklist()
prev.metrics()
FAQ - คำถามที่พบบ่อย
Q: Soda กับ Great Expectations อันไหนดีกว่า?
A: Soda: YAML-based (SodaCL), เรียนรู้ง่าย, Soda Cloud dashboard Great Expectations: Python-based, flexible กว่า, community ใหญ่กว่า ใช้ Soda: เมื่อทีมไม่ใช่ Python developers, ต้องการ quick setup ใช้ GX: เมื่อต้องการ custom checks ซับซ้อน, Python team
Q: ทำ post-mortem ทุก incident ไหม?
A: P1-P2 (Critical/High): ทุกครั้ง blameless post-mortem P3 (Medium): ถ้ามี pattern หรือกระทบหลาย consumers P4 (Low): log ไว้ review ใน retrospective ไม่ต้องทำทุกอัน แต่ต้องมีระบบ track
Q: Soda checks ควรรันบ่อยแค่ไหน?
A: After every ETL run: critical checks (row count, freshness, duplicates) Hourly: anomaly detection, business metric checks Daily: comprehensive full scan Weekly: schema validation, cross-table consistency ยิ่ง data สำคัญ ยิ่งรันบ่อย
Q: Blameless post-mortem คืออะไร?
A: Post-mortem ที่ไม่โทษใคร focus ที่ระบบและ process ไม่ใช่คน ถาม "ทำไมระบบ allow สิ่งนี้เกิดขึ้น" ไม่ใช่ "ใครทำผิด" เป้าหมาย: ปรับปรุง process, เพิ่ม checks, automation ทำให้ทีมกล้า report issues → พบปัญหาเร็วขึ้น
