Monte Carlo Data Observability SRE
Monte Carlo Data Observability SRE SLI SLO Error Budget Freshness Volume Schema Distribution Lineage Incident Alert Production
| Pillar | What | SLI Metric | SLO Target |
|---|---|---|---|
| Freshness | ข้อมูลมาตรงเวลา | Hours since last update | < 1 hour (99.9%) |
| Volume | จำนวน Row ปกติ | Row count vs expected | Within ±10% (99.5%) |
| Schema | Schema ไม่เปลี่ยนกะทันหัน | Schema changes detected | Detection < 5 min |
| Distribution | ค่าข้อมูลกระจายปกติ | Distribution anomaly score | Score < threshold |
| Lineage | รู้ Impact เมื่อมีปัญหา | Lineage coverage % | > 95% tables mapped |
SRE Data SLI/SLO
# === Data SLI/SLO Framework ===
from dataclasses import dataclass
@dataclass
class DataSLO:
sli: str
measurement: str
slo_target: str
error_budget: str
alert_threshold: str
slos = [
DataSLO("Data Freshness",
"Time since last successful pipeline run",
"< 1 hour for 99.9% of time",
"~43 min/month allowed delay",
"P2 > 1hr | P1 > 2hr"),
DataSLO("Data Completeness",
"Row count actual / expected * 100",
"> 99.5% rows present",
"0.5% missing rows allowed",
"P3 < 99% | P2 < 98% | P1 < 95%"),
DataSLO("Pipeline Success Rate",
"Successful runs / total runs * 100",
"> 99.9% success",
"~4 failed runs/month allowed",
"P2 3+ failures/day | P1 pipeline stuck"),
DataSLO("Schema Stability",
"Unplanned schema changes per month",
"0 unplanned breaking changes",
"0 tolerance for breaking changes",
"P2 any unplanned change | P1 breaking change"),
DataSLO("Data Accuracy",
"Rows passing quality checks / total * 100",
"> 99.9% accuracy",
"0.1% bad data allowed",
"P3 < 99.5% | P2 < 99% | P1 < 95%"),
]
print("=== Data SLI/SLO ===")
for s in slos:
print(f" [{s.sli}]")
print(f" Measure: {s.measurement}")
print(f" SLO: {s.slo_target}")
print(f" Budget: {s.error_budget}")
print(f" Alert: {s.alert_threshold}")
Incident Response
# === Data Incident Response ===
# Monte Carlo Alert → PagerDuty → On-call SRE
# SRE checks Monte Carlo Dashboard:
# 1. Which table is affected?
# 2. What pillar failed? (Freshness/Volume/Schema/Distribution)
# 3. Lineage: What downstream is impacted?
# 4. Root cause: Source issue? Pipeline bug? Schema change?
@dataclass
class IncidentType:
incident: str
severity: str
detection: str
response: str
resolution: str
incidents = [
IncidentType("Pipeline Down (No Data)",
"P1 Critical",
"Monte Carlo Freshness Alert > 2hr",
"Page On-call → Check Airflow/dbt → Check Source",
"Fix Pipeline → Backfill Missing Data → Verify"),
IncidentType("Freshness SLA Breach",
"P2 High",
"Monte Carlo Freshness > SLO (1hr)",
"Check Pipeline Duration → Check Source Latency",
"Optimize Pipeline → Scale Resources → Tune Schedule"),
IncidentType("Volume Anomaly (Drop/Spike)",
"P3 Medium",
"Monte Carlo Volume < 90% or > 150% expected",
"Check Source System → Check Filter Logic → Check Date Range",
"Fix Source/Filter → Rerun Pipeline → Verify Count"),
IncidentType("Schema Breaking Change",
"P2 High",
"Monte Carlo Schema Alert (Column dropped/type changed)",
"Check who changed → Impact Analysis via Lineage",
"Rollback Schema → Update Pipeline → Coordinate with Team"),
IncidentType("Distribution Drift",
"P3 Medium",
"Monte Carlo Distribution anomaly score > threshold",
"Check Data Source → Compare with Historical → Verify Logic",
"Fix Source Issue → Update Expected Distribution → Monitor"),
]
print("=== Data Incidents ===")
for i in incidents:
print(f"\n [{i.severity}] {i.incident}")
print(f" Detection: {i.detection}")
print(f" Response: {i.response}")
print(f" Resolution: {i.resolution}")
Automation
# === Self-healing Pipeline ===
# Airflow DAG with Monte Carlo Integration
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.sensors.external_task import ExternalTaskSensor
# import requests
#
# def check_monte_carlo_status(table):
# resp = requests.get(f"https://api.getmontecarlo.com/v1/tables/{table}/status",
# headers={"Authorization": f"Bearer {MC_TOKEN}"})
# status = resp.json()
# if status["has_active_incidents"]:
# raise Exception(f"Monte Carlo incident on {table}")
# return True
#
# def auto_retry_pipeline(context):
# ti = context['task_instance']
# if ti.try_number < 3:
# ti.clear() # auto retry
# else:
# # Alert PagerDuty
# requests.post(PD_WEBHOOK, json={"event": "pipeline_failed"})
@dataclass
class AutoAction:
trigger: str
condition: str
action: str
fallback: str
automations = [
AutoAction("Source Data Late",
"Airflow Sensor timeout > 30min",
"Wait (Sensor) → Retry 3x with backoff",
"Alert P2 → Use cached data → Investigate"),
AutoAction("Pipeline Transient Error",
"Pipeline fail with network/timeout error",
"Auto-retry 3x with exponential backoff",
"Alert P2 → Manual investigation"),
AutoAction("Schema Change Detected",
"Monte Carlo Schema Alert",
"Block downstream Pipeline → Notify Data Team",
"Auto-adapt if non-breaking → Alert if breaking"),
AutoAction("Volume Drop > 50%",
"Monte Carlo Volume Alert",
"Pause downstream → Check source → Alert Data Team",
"Use previous data → Investigate source"),
AutoAction("Quality Check Failed",
"dbt test / Monte Carlo Distribution alert",
"Quarantine bad data → Alert → Reprocess",
"Rollback to last good data → Fix & rerun"),
]
print("=== Self-healing Automations ===")
for a in automations:
print(f" [{a.trigger}]")
print(f" Condition: {a.condition}")
print(f" Action: {a.action}")
print(f" Fallback: {a.fallback}")
เคล็ดลับ
- SLO: กำหนด Data SLO ชัดเจน Freshness Volume Accuracy
- Lineage: ใช้ Lineage หา Root Cause และ Impact ทันที
- Error Budget: ใช้ Error Budget Policy หยุด Deploy เมื่อหมด
- Postmortem: เขียน Postmortem ทุก P1 Incident ป้องกันซ้ำ
- Self-healing: Auto-retry + Fallback ลด Manual Toil
การบริหารจัดการฐานข้อมูลอย่างมืออาชีพ
Database Management ที่ดีเริ่มจากการออกแบบ Schema ที่เหมาะสม ใช้ Normalization ลด Data Redundancy สร้าง Index บน Column ที่ Query บ่อย วิเคราะห์ Query Plan เพื่อ Optimize Performance และทำ Regular Maintenance เช่น VACUUM สำหรับ PostgreSQL หรือ OPTIMIZE TABLE สำหรับ MySQL
เรื่อง High Availability ควรติดตั้ง Replication อย่างน้อย 1 Replica สำหรับ Read Scaling และ Disaster Recovery ใช้ Connection Pooling เช่น PgBouncer หรือ ProxySQL ลดภาระ Connection ที่เปิดพร้อมกัน และตั้ง Automated Failover ให้ระบบสลับไป Replica อัตโนมัติเมื่อ Primary ล่ม
Backup ต้องทำทั้ง Full Backup รายวัน และ Incremental Backup ทุก 1-4 ชั่วโมง เก็บ Binary Log หรือ WAL สำหรับ Point-in-Time Recovery ทดสอบ Restore เป็นประจำ และเก็บ Backup ไว้ Off-site ด้วยเสมอ
Monte Carlo คืออะไร
Data Observability Platform Freshness Volume Schema Distribution Lineage ML Anomaly Snowflake BigQuery Databricks Alert Auto-detect
Data Observability สำหรับ SRE ทำอย่างไร
SLI SLO Error Budget Data Freshness Completeness Accuracy Pipeline Success Schema Stability Toil Reduction Self-healing Automation
Incident Management ทำอย่างไร
P1 Pipeline Down P2 SLA Breach P3 Volume Schema Detect Triage Investigate Mitigate Resolve Postmortem Root Cause Lineage Impact
Automation & Self-healing ทำอย่างไร
Auto-retry Backoff Sensor Fallback Cache Circuit Breaker Quarantine Airflow dbt Monte Carlo API Webhook PagerDuty Block Downstream
สรุป
Monte Carlo Data Observability SRE SLI SLO Error Budget Freshness Volume Schema Lineage Incident Postmortem Self-healing Production
