SiamCafe.net Blog
Technology

Monte Carlo Observability Site Reliability SRE

monte carlo observability site reliability sre
Monte Carlo Observability Site Reliability SRE | SiamCafe Blog
2025-11-04· อ. บอม — SiamCafe.net· 8,192 คำ

Monte Carlo Data Observability SRE

Monte Carlo Data Observability SRE SLI SLO Error Budget Freshness Volume Schema Distribution Lineage Incident Alert Production

PillarWhatSLI MetricSLO Target
Freshnessข้อมูลมาตรงเวลาHours since last update< 1 hour (99.9%)
Volumeจำนวน Row ปกติRow count vs expectedWithin ±10% (99.5%)
SchemaSchema ไม่เปลี่ยนกะทันหันSchema changes detectedDetection < 5 min
Distributionค่าข้อมูลกระจายปกติDistribution anomaly scoreScore < threshold
Lineageรู้ Impact เมื่อมีปัญหาLineage coverage %> 95% tables mapped

SRE Data SLI/SLO

# === Data SLI/SLO Framework ===

from dataclasses import dataclass

@dataclass
class DataSLO:
    sli: str
    measurement: str
    slo_target: str
    error_budget: str
    alert_threshold: str

slos = [
    DataSLO("Data Freshness",
        "Time since last successful pipeline run",
        "< 1 hour for 99.9% of time",
        "~43 min/month allowed delay",
        "P2 > 1hr | P1 > 2hr"),
    DataSLO("Data Completeness",
        "Row count actual / expected * 100",
        "> 99.5% rows present",
        "0.5% missing rows allowed",
        "P3 < 99% | P2 < 98% | P1 < 95%"),
    DataSLO("Pipeline Success Rate",
        "Successful runs / total runs * 100",
        "> 99.9% success",
        "~4 failed runs/month allowed",
        "P2 3+ failures/day | P1 pipeline stuck"),
    DataSLO("Schema Stability",
        "Unplanned schema changes per month",
        "0 unplanned breaking changes",
        "0 tolerance for breaking changes",
        "P2 any unplanned change | P1 breaking change"),
    DataSLO("Data Accuracy",
        "Rows passing quality checks / total * 100",
        "> 99.9% accuracy",
        "0.1% bad data allowed",
        "P3 < 99.5% | P2 < 99% | P1 < 95%"),
]

print("=== Data SLI/SLO ===")
for s in slos:
    print(f"  [{s.sli}]")
    print(f"    Measure: {s.measurement}")
    print(f"    SLO: {s.slo_target}")
    print(f"    Budget: {s.error_budget}")
    print(f"    Alert: {s.alert_threshold}")

Incident Response

# === Data Incident Response ===

# Monte Carlo Alert → PagerDuty → On-call SRE
# SRE checks Monte Carlo Dashboard:
#   1. Which table is affected?
#   2. What pillar failed? (Freshness/Volume/Schema/Distribution)
#   3. Lineage: What downstream is impacted?
#   4. Root cause: Source issue? Pipeline bug? Schema change?

@dataclass
class IncidentType:
    incident: str
    severity: str
    detection: str
    response: str
    resolution: str

incidents = [
    IncidentType("Pipeline Down (No Data)",
        "P1 Critical",
        "Monte Carlo Freshness Alert > 2hr",
        "Page On-call → Check Airflow/dbt → Check Source",
        "Fix Pipeline → Backfill Missing Data → Verify"),
    IncidentType("Freshness SLA Breach",
        "P2 High",
        "Monte Carlo Freshness > SLO (1hr)",
        "Check Pipeline Duration → Check Source Latency",
        "Optimize Pipeline → Scale Resources → Tune Schedule"),
    IncidentType("Volume Anomaly (Drop/Spike)",
        "P3 Medium",
        "Monte Carlo Volume < 90% or > 150% expected",
        "Check Source System → Check Filter Logic → Check Date Range",
        "Fix Source/Filter → Rerun Pipeline → Verify Count"),
    IncidentType("Schema Breaking Change",
        "P2 High",
        "Monte Carlo Schema Alert (Column dropped/type changed)",
        "Check who changed → Impact Analysis via Lineage",
        "Rollback Schema → Update Pipeline → Coordinate with Team"),
    IncidentType("Distribution Drift",
        "P3 Medium",
        "Monte Carlo Distribution anomaly score > threshold",
        "Check Data Source → Compare with Historical → Verify Logic",
        "Fix Source Issue → Update Expected Distribution → Monitor"),
]

print("=== Data Incidents ===")
for i in incidents:
    print(f"\n  [{i.severity}] {i.incident}")
    print(f"    Detection: {i.detection}")
    print(f"    Response: {i.response}")
    print(f"    Resolution: {i.resolution}")

Automation

# === Self-healing Pipeline ===

# Airflow DAG with Monte Carlo Integration
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.sensors.external_task import ExternalTaskSensor
# import requests
#
# def check_monte_carlo_status(table):
#     resp = requests.get(f"https://api.getmontecarlo.com/v1/tables/{table}/status",
#         headers={"Authorization": f"Bearer {MC_TOKEN}"})
#     status = resp.json()
#     if status["has_active_incidents"]:
#         raise Exception(f"Monte Carlo incident on {table}")
#     return True
#
# def auto_retry_pipeline(context):
#     ti = context['task_instance']
#     if ti.try_number < 3:
#         ti.clear()  # auto retry
#     else:
#         # Alert PagerDuty
#         requests.post(PD_WEBHOOK, json={"event": "pipeline_failed"})

@dataclass
class AutoAction:
    trigger: str
    condition: str
    action: str
    fallback: str

automations = [
    AutoAction("Source Data Late",
        "Airflow Sensor timeout > 30min",
        "Wait (Sensor) → Retry 3x with backoff",
        "Alert P2 → Use cached data → Investigate"),
    AutoAction("Pipeline Transient Error",
        "Pipeline fail with network/timeout error",
        "Auto-retry 3x with exponential backoff",
        "Alert P2 → Manual investigation"),
    AutoAction("Schema Change Detected",
        "Monte Carlo Schema Alert",
        "Block downstream Pipeline → Notify Data Team",
        "Auto-adapt if non-breaking → Alert if breaking"),
    AutoAction("Volume Drop > 50%",
        "Monte Carlo Volume Alert",
        "Pause downstream → Check source → Alert Data Team",
        "Use previous data → Investigate source"),
    AutoAction("Quality Check Failed",
        "dbt test / Monte Carlo Distribution alert",
        "Quarantine bad data → Alert → Reprocess",
        "Rollback to last good data → Fix & rerun"),
]

print("=== Self-healing Automations ===")
for a in automations:
    print(f"  [{a.trigger}]")
    print(f"    Condition: {a.condition}")
    print(f"    Action: {a.action}")
    print(f"    Fallback: {a.fallback}")

เคล็ดลับ

การบริหารจัดการฐานข้อมูลอย่างมืออาชีพ

Database Management ที่ดีเริ่มจากการออกแบบ Schema ที่เหมาะสม ใช้ Normalization ลด Data Redundancy สร้าง Index บน Column ที่ Query บ่อย วิเคราะห์ Query Plan เพื่อ Optimize Performance และทำ Regular Maintenance เช่น VACUUM สำหรับ PostgreSQL หรือ OPTIMIZE TABLE สำหรับ MySQL

เรื่อง High Availability ควรติดตั้ง Replication อย่างน้อย 1 Replica สำหรับ Read Scaling และ Disaster Recovery ใช้ Connection Pooling เช่น PgBouncer หรือ ProxySQL ลดภาระ Connection ที่เปิดพร้อมกัน และตั้ง Automated Failover ให้ระบบสลับไป Replica อัตโนมัติเมื่อ Primary ล่ม

Backup ต้องทำทั้ง Full Backup รายวัน และ Incremental Backup ทุก 1-4 ชั่วโมง เก็บ Binary Log หรือ WAL สำหรับ Point-in-Time Recovery ทดสอบ Restore เป็นประจำ และเก็บ Backup ไว้ Off-site ด้วยเสมอ

Monte Carlo คืออะไร

Data Observability Platform Freshness Volume Schema Distribution Lineage ML Anomaly Snowflake BigQuery Databricks Alert Auto-detect

Data Observability สำหรับ SRE ทำอย่างไร

SLI SLO Error Budget Data Freshness Completeness Accuracy Pipeline Success Schema Stability Toil Reduction Self-healing Automation

Incident Management ทำอย่างไร

P1 Pipeline Down P2 SLA Breach P3 Volume Schema Detect Triage Investigate Mitigate Resolve Postmortem Root Cause Lineage Impact

Automation & Self-healing ทำอย่างไร

Auto-retry Backoff Sensor Fallback Cache Circuit Breaker Quarantine Airflow dbt Monte Carlo API Webhook PagerDuty Block Downstream

สรุป

Monte Carlo Data Observability SRE SLI SLO Error Budget Freshness Volume Schema Lineage Incident Postmortem Self-healing Production

📖 บทความที่เกี่ยวข้อง

Monte Carlo Observability Observability Stackอ่านบทความ → Monte Carlo Observability Monitoring และ Alertingอ่านบทความ → Monte Carlo Observability DNS Managementอ่านบทความ → Monte Carlo Observability Edge Computingอ่านบทความ → Monte Carlo Observability Remote Work Setupอ่านบทความ →

📚 ดูบทความทั้งหมด →