SiamCafe.net Blog
Technology

Monte Carlo Observability Machine Learning Pipeline — Data Quality สำหรับ ML

monte carlo observability machine learning pipeline
Monte Carlo Observability Machine Learning Pipeline | SiamCafe Blog
2025-06-17· อ. บอม — SiamCafe.net· 1,773 คำ

Monte Carlo Data Observability คืออะไร

Monte Carlo เป็น data observability platform ที่ตรวจจับ data quality issues อัตโนมัติโดยไม่ต้องเขียน rules ล่วงหน้า ใช้ machine learning เรียนรู้ patterns ของ data แล้วแจ้งเตือนเมื่อพบ anomalies เป็น leader ใน data observability category

หลักการ 5 Pillars ของ Data Observability ได้แก่ Freshness ข้อมูลอัพเดทตรงเวลาหรือไม่, Volume จำนวน rows/records ตรงกับที่คาดหวังหรือไม่, Schema โครงสร้างข้อมูลเปลี่ยนแปลงหรือไม่, Distribution ค่าของข้อมูลอยู่ในช่วงปกติหรือไม่ และ Lineage ข้อมูลไหลจากไหนไปไหน

สำหรับ ML Pipeline data quality สำคัญมากเพราะ garbage in garbage out ถ้า training data มีปัญหา model จะให้ผลลัพธ์ที่ผิดพลาด Monte Carlo ช่วยตรวจจับปัญหาก่อนที่จะกระทบ model training และ production inference

ติดตั้งและตั้งค่า Monte Carlo

วิธีเริ่มต้นใช้งาน data observability

# === Data Observability Setup ===

# 1. Install Monte Carlo CLI
pip install pymontecarlo

# 2. Configure Monte Carlo
# montecarlo configure
# API Key: your-api-key
# API Secret: your-api-secret

# 3. Open Source Alternative: Great Expectations + Custom Monitoring
pip install great-expectations evidently whylogs

# 4. Great Expectations Setup
python3 << 'PYEOF'
import great_expectations as gx

# Initialize context
context = gx.get_context()

# Add data source
# datasource = context.sources.add_pandas("my_datasource")

# Create expectations
# suite = context.add_expectation_suite("ml_training_data_suite")
# 
# # Freshness check
# suite.add_expectation(
#     gx.expectations.ExpectColumnValuesToNotBeNull(column="timestamp")
# )
# 
# # Volume check
# suite.add_expectation(
#     gx.expectations.ExpectTableRowCountToBeBetween(min_value=1000, max_value=100000)
# )
# 
# # Distribution check
# suite.add_expectation(
#     gx.expectations.ExpectColumnMeanToBeBetween(
#         column="amount", min_value=50, max_value=200
#     )
# )
# 
# # Schema check
# suite.add_expectation(
#     gx.expectations.ExpectTableColumnsToMatchOrderedList(
#         column_list=["id", "amount", "category", "timestamp"]
#     )
# )

print("Great Expectations initialized")
PYEOF

# 5. WhyLogs for Data Profiling
python3 << 'PYEOF'
import whylogs as why
import pandas as pd
import numpy as np

# Create sample data
np.random.seed(42)
df = pd.DataFrame({
    "amount": np.random.normal(100, 30, 1000),
    "category": np.random.choice(["A", "B", "C"], 1000),
    "is_fraud": np.random.choice([0, 1], 1000, p=[0.95, 0.05]),
    "timestamp": pd.date_range("2025-01-01", periods=1000, freq="h"),
})

# Profile data
profile = why.log(df)
view = profile.view()

# Get summary statistics
summary = view.to_pandas()
print("Data Profile Summary:")
print(summary.head())

# Save profile for comparison
# profile.writer("local").write(dest="/tmp/profiles/baseline")
print("WhyLogs profile created")
PYEOF

# 6. Evidently for Data Drift
python3 << 'PYEOF'
# from evidently.report import Report
# from evidently.metric_preset import DataDriftPreset, DataQualityPreset
# 
# report = Report(metrics=[DataDriftPreset(), DataQualityPreset()])
# report.run(reference_data=df_train, current_data=df_current)
# report.save_html("drift_report.html")
print("Evidently configured for drift detection")
PYEOF

echo "Data observability tools installed"

Data Quality Monitoring สำหรับ ML Pipeline

Monitor data quality ตลอด pipeline

#!/usr/bin/env python3
# data_quality_monitor.py — ML Pipeline Data Quality
import json
import logging
import math
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("dq_monitor")

@dataclass
class DataQualityCheck:
    name: str
    check_type: str
    status: str = "pending"
    value: Optional[float] = None
    threshold: Optional[float] = None
    message: str = ""

class MLDataQualityMonitor:
    def __init__(self):
        self.checks: List[DataQualityCheck] = []
        self.history = []
    
    def check_freshness(self, latest_timestamp, max_delay_hours=6):
        now = datetime.utcnow()
        delay = (now - latest_timestamp).total_seconds() / 3600
        
        check = DataQualityCheck(
            name="data_freshness",
            check_type="freshness",
            value=round(delay, 1),
            threshold=max_delay_hours,
            status="pass" if delay <= max_delay_hours else "fail",
            message=f"Data delay: {delay:.1f}h (max: {max_delay_hours}h)",
        )
        self.checks.append(check)
        return check
    
    def check_volume(self, row_count, expected_min, expected_max):
        check = DataQualityCheck(
            name="data_volume",
            check_type="volume",
            value=row_count,
            threshold=expected_min,
            status="pass" if expected_min <= row_count <= expected_max else "fail",
            message=f"Rows: {row_count} (expected: {expected_min}-{expected_max})",
        )
        self.checks.append(check)
        return check
    
    def check_null_rate(self, column_name, null_count, total_count, max_null_pct=5.0):
        null_pct = (null_count / max(total_count, 1)) * 100
        
        check = DataQualityCheck(
            name=f"null_rate_{column_name}",
            check_type="completeness",
            value=round(null_pct, 2),
            threshold=max_null_pct,
            status="pass" if null_pct <= max_null_pct else "fail",
            message=f"{column_name}: {null_pct:.2f}% nulls (max: {max_null_pct}%)",
        )
        self.checks.append(check)
        return check
    
    def check_distribution(self, column_name, current_mean, current_std,
                            baseline_mean, baseline_std, z_threshold=3.0):
        if baseline_std > 0:
            z_score = abs(current_mean - baseline_mean) / baseline_std
        else:
            z_score = 0
        
        check = DataQualityCheck(
            name=f"distribution_{column_name}",
            check_type="distribution",
            value=round(z_score, 2),
            threshold=z_threshold,
            status="pass" if z_score <= z_threshold else "fail",
            message=f"{column_name}: z-score={z_score:.2f} (mean: {current_mean:.2f} vs baseline: {baseline_mean:.2f})",
        )
        self.checks.append(check)
        return check
    
    def check_schema(self, current_columns, expected_columns):
        missing = set(expected_columns) - set(current_columns)
        extra = set(current_columns) - set(expected_columns)
        
        status = "pass" if not missing and not extra else "fail"
        msg_parts = []
        if missing:
            msg_parts.append(f"Missing: {missing}")
        if extra:
            msg_parts.append(f"Extra: {extra}")
        
        check = DataQualityCheck(
            name="schema_check",
            check_type="schema",
            status=status,
            message=" | ".join(msg_parts) if msg_parts else "Schema matches",
        )
        self.checks.append(check)
        return check
    
    def check_class_balance(self, label_counts, min_ratio=0.01):
        total = sum(label_counts.values())
        
        for label, count in label_counts.items():
            ratio = count / total
            if ratio < min_ratio:
                self.checks.append(DataQualityCheck(
                    name=f"class_balance_{label}",
                    check_type="balance",
                    value=round(ratio, 4),
                    threshold=min_ratio,
                    status="fail",
                    message=f"Class '{label}': {ratio:.4f} (min: {min_ratio})",
                ))
                return
        
        self.checks.append(DataQualityCheck(
            name="class_balance",
            check_type="balance",
            status="pass",
            message="Class balance OK",
        ))
    
    def get_report(self):
        passed = sum(1 for c in self.checks if c.status == "pass")
        failed = sum(1 for c in self.checks if c.status == "fail")
        
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "total_checks": len(self.checks),
            "passed": passed,
            "failed": failed,
            "score": round(passed / max(len(self.checks), 1) * 100, 1),
            "overall_status": "healthy" if failed == 0 else "degraded" if failed <= 2 else "critical",
            "checks": [
                {"name": c.name, "type": c.check_type, "status": c.status,
                 "value": c.value, "message": c.message}
                for c in self.checks
            ],
        }

# Run quality checks
monitor = MLDataQualityMonitor()

monitor.check_freshness(datetime.utcnow() - timedelta(hours=2))
monitor.check_volume(45000, 40000, 60000)
monitor.check_null_rate("amount", 150, 45000)
monitor.check_null_rate("category", 5200, 45000, max_null_pct=5.0)
monitor.check_distribution("amount", 105.2, 32.1, 100.0, 30.0)
monitor.check_schema(
    ["id", "amount", "category", "timestamp"],
    ["id", "amount", "category", "timestamp"]
)
monitor.check_class_balance({"legit": 42750, "fraud": 2250})

print(json.dumps(monitor.get_report(), indent=2))

Automated Anomaly Detection

ตรวจจับ anomalies อัตโนมัติ

#!/usr/bin/env python3
# anomaly_detector.py — Data Anomaly Detection
import json
import math
import logging
from datetime import datetime
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("anomaly")

class DataAnomalyDetector:
    def __init__(self, window_size=30):
        self.window_size = window_size
        self.baselines: Dict[str, Dict] = {}
        self.anomalies: List[Dict] = []
    
    def set_baseline(self, metric_name, values):
        if not values:
            return
        
        mean = sum(values) / len(values)
        variance = sum((x - mean) ** 2 for x in values) / len(values)
        std = math.sqrt(variance)
        
        sorted_vals = sorted(values)
        p5 = sorted_vals[int(len(sorted_vals) * 0.05)]
        p95 = sorted_vals[int(len(sorted_vals) * 0.95)]
        
        self.baselines[metric_name] = {
            "mean": mean,
            "std": std,
            "min": min(values),
            "max": max(values),
            "p5": p5,
            "p95": p95,
            "sample_count": len(values),
        }
    
    def detect(self, metric_name, current_value, sensitivity=2.0):
        baseline = self.baselines.get(metric_name)
        if not baseline:
            return {"anomaly": False, "reason": "No baseline"}
        
        mean = baseline["mean"]
        std = baseline["std"]
        
        if std == 0:
            is_anomaly = current_value != mean
            z_score = float('inf') if is_anomaly else 0
        else:
            z_score = abs(current_value - mean) / std
            is_anomaly = z_score > sensitivity
        
        result = {
            "metric": metric_name,
            "current_value": round(current_value, 4),
            "baseline_mean": round(mean, 4),
            "baseline_std": round(std, 4),
            "z_score": round(z_score, 2),
            "anomaly": is_anomaly,
            "severity": "critical" if z_score > 4 else "high" if z_score > 3 else "medium" if z_score > 2 else "low",
            "direction": "above" if current_value > mean else "below",
        }
        
        if is_anomaly:
            result["timestamp"] = datetime.utcnow().isoformat()
            self.anomalies.append(result)
            logger.warning(f"Anomaly: {metric_name}={current_value} (z={z_score:.1f})")
        
        return result
    
    def detect_trend(self, metric_name, recent_values, trend_threshold=3):
        if len(recent_values) < trend_threshold:
            return {"trend": "insufficient_data"}
        
        increasing = all(recent_values[i] > recent_values[i-1] 
                        for i in range(1, len(recent_values)))
        decreasing = all(recent_values[i] < recent_values[i-1] 
                        for i in range(1, len(recent_values)))
        
        if increasing:
            trend = "increasing"
        elif decreasing:
            trend = "decreasing"
        else:
            trend = "stable"
        
        change_pct = ((recent_values[-1] - recent_values[0]) / max(abs(recent_values[0]), 0.001)) * 100
        
        return {
            "metric": metric_name,
            "trend": trend,
            "change_pct": round(change_pct, 1),
            "values": [round(v, 2) for v in recent_values],
            "alert": abs(change_pct) > 20,
        }
    
    def get_anomaly_summary(self):
        return {
            "total_anomalies": len(self.anomalies),
            "by_severity": {
                "critical": sum(1 for a in self.anomalies if a["severity"] == "critical"),
                "high": sum(1 for a in self.anomalies if a["severity"] == "high"),
                "medium": sum(1 for a in self.anomalies if a["severity"] == "medium"),
            },
            "anomalies": self.anomalies,
        }

import random
random.seed(42)

detector = DataAnomalyDetector()

# Set baselines from historical data
detector.set_baseline("row_count", [random.gauss(50000, 5000) for _ in range(30)])
detector.set_baseline("null_rate", [random.gauss(2.0, 0.5) for _ in range(30)])
detector.set_baseline("mean_amount", [random.gauss(100, 5) for _ in range(30)])

# Check current values
detector.detect("row_count", 52000)     # Normal
detector.detect("row_count", 15000)     # Anomaly!
detector.detect("null_rate", 12.5)      # Anomaly!
detector.detect("mean_amount", 98.5)    # Normal

# Trend detection
detector.detect_trend("row_count", [50000, 48000, 45000, 42000, 38000])

print(json.dumps(detector.get_anomaly_summary(), indent=2))

Integration กับ ML Workflow

รวม data observability เข้ากับ ML pipeline

# === ML Pipeline Integration ===

# 1. Pre-Training Data Validation
# ===================================
# Airflow DAG example:
#
# with DAG("ml_training_pipeline", schedule="@daily") as dag:
#     
#     validate_data = PythonOperator(
#         task_id="validate_training_data",
#         python_callable=run_data_quality_checks,
#         op_kwargs={"dataset": "training_data_v3"},
#     )
#     
#     check_drift = PythonOperator(
#         task_id="check_data_drift",
#         python_callable=run_drift_detection,
#     )
#     
#     train_model = PythonOperator(
#         task_id="train_model",
#         python_callable=train_and_evaluate,
#     )
#     
#     validate_model = PythonOperator(
#         task_id="validate_model_quality",
#         python_callable=validate_model_output,
#     )
#     
#     validate_data >> check_drift >> train_model >> validate_model

# 2. Feature Store Monitoring
# ===================================
#!/usr/bin/env python3
# feature_monitor.py

import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("feature")

class FeatureStoreMonitor:
    def __init__(self):
        self.feature_stats = {}
    
    def register_feature(self, name, expected_type, bounds=None):
        self.feature_stats[name] = {
            "type": expected_type,
            "bounds": bounds,
            "observations": [],
        }
    
    def observe(self, feature_name, value):
        if feature_name in self.feature_stats:
            self.feature_stats[feature_name]["observations"].append(value)
    
    def validate_features(self, feature_values):
        issues = []
        
        for name, value in feature_values.items():
            stats = self.feature_stats.get(name)
            if not stats:
                issues.append({"feature": name, "issue": "unregistered feature"})
                continue
            
            # Type check
            if stats["type"] == "numeric" and not isinstance(value, (int, float)):
                issues.append({"feature": name, "issue": f"expected numeric, got {type(value).__name__}"})
            
            # Bounds check
            if stats["bounds"]:
                min_val, max_val = stats["bounds"]
                if isinstance(value, (int, float)):
                    if value < min_val or value > max_val:
                        issues.append({
                            "feature": name,
                            "issue": f"out of bounds: {value} not in [{min_val}, {max_val}]"
                        })
        
        return {
            "valid": len(issues) == 0,
            "features_checked": len(feature_values),
            "issues": issues,
        }

fm = FeatureStoreMonitor()
fm.register_feature("amount", "numeric", (0, 10000))
fm.register_feature("frequency", "numeric", (0, 100))
fm.register_feature("category", "categorical")

result = fm.validate_features({"amount": 150.5, "frequency": 5, "category": "A"})
print("Valid features:", json.dumps(result, indent=2))

result = fm.validate_features({"amount": -50, "frequency": 200, "category": "B"})
print("Invalid features:", json.dumps(result, indent=2))

# 3. Post-Prediction Monitoring
# ===================================
# Monitor prediction quality over time:
# - Prediction distribution shift
# - Feature importance drift
# - Model confidence scores
# - Ground truth comparison (when available)
#
# Prometheus metrics:
# ml_prediction_distribution{model="fraud", bucket="0.0-0.2"} 850
# ml_prediction_distribution{model="fraud", bucket="0.8-1.0"} 50
# ml_feature_importance{model="fraud", feature="amount"} 0.35
# ml_model_confidence_avg{model="fraud"} 0.87

echo "ML pipeline integration configured"

Alerting และ Incident Response

ตั้งค่า alerts และ incident response

#!/usr/bin/env python3
# data_incident_response.py — Data Incident Management
import json
import logging
from datetime import datetime
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("incident")

class DataIncidentManager:
    def __init__(self):
        self.incidents = []
        self.alert_rules = []
        self.runbooks = {}
    
    def add_alert_rule(self, name, condition, severity, channels):
        self.alert_rules.append({
            "name": name,
            "condition": condition,
            "severity": severity,
            "channels": channels,
            "enabled": True,
        })
    
    def add_runbook(self, incident_type, steps):
        self.runbooks[incident_type] = steps
    
    def create_incident(self, title, severity, source, details):
        incident = {
            "id": f"INC-{len(self.incidents)+1:04d}",
            "title": title,
            "severity": severity,
            "source": source,
            "details": details,
            "status": "open",
            "created_at": datetime.utcnow().isoformat(),
            "runbook": self.runbooks.get(source, []),
        }
        
        self.incidents.append(incident)
        logger.warning(f"Incident created: {incident['id']} — {title}")
        return incident
    
    def resolve_incident(self, incident_id, resolution):
        for inc in self.incidents:
            if inc["id"] == incident_id:
                inc["status"] = "resolved"
                inc["resolution"] = resolution
                inc["resolved_at"] = datetime.utcnow().isoformat()
                return inc
        return None
    
    def get_dashboard(self):
        open_incidents = [i for i in self.incidents if i["status"] == "open"]
        
        return {
            "total_incidents": len(self.incidents),
            "open": len(open_incidents),
            "resolved": len(self.incidents) - len(open_incidents),
            "by_severity": {
                "critical": sum(1 for i in open_incidents if i["severity"] == "critical"),
                "high": sum(1 for i in open_incidents if i["severity"] == "high"),
                "medium": sum(1 for i in open_incidents if i["severity"] == "medium"),
            },
            "open_incidents": open_incidents,
        }

mgr = DataIncidentManager()

# Add runbooks
mgr.add_runbook("data_freshness", [
    "Check source system status",
    "Verify ETL pipeline logs",
    "Check network connectivity to source",
    "Manually trigger pipeline if needed",
    "Notify downstream consumers",
])

mgr.add_runbook("data_drift", [
    "Analyze drift report (which features drifted)",
    "Check source data for anomalies",
    "Compare with historical patterns",
    "If legitimate change: update baseline",
    "If anomaly: investigate root cause",
    "Consider model retraining if significant",
])

mgr.add_runbook("schema_change", [
    "Identify changed columns",
    "Check if upstream schema migration occurred",
    "Update pipeline transformations if needed",
    "Test pipeline with new schema",
    "Update data contracts",
])

# Alert rules
mgr.add_alert_rule("Stale Data", "freshness_hours > 6", "high", ["slack", "pagerduty"])
mgr.add_alert_rule("Volume Drop", "row_count < expected * 0.5", "critical", ["slack", "pagerduty"])
mgr.add_alert_rule("Data Drift", "drift_score > 2.0", "medium", ["slack"])
mgr.add_alert_rule("Null Spike", "null_rate > 10%", "high", ["slack"])

# Create incidents
mgr.create_incident(
    "Training data 8 hours stale",
    "high", "data_freshness",
    {"table": "training_features", "delay_hours": 8})

mgr.create_incident(
    "Feature drift detected on amount column",
    "medium", "data_drift",
    {"feature": "amount", "drift_score": 3.2})

print(json.dumps(mgr.get_dashboard(), indent=2, ensure_ascii=False))

FAQ คำถามที่พบบ่อย

Q: Monte Carlo กับ Great Expectations ต่างกันอย่างไร?

A: Monte Carlo เป็น SaaS platform ที่ใช้ ML ตรวจจับ anomalies อัตโนมัติ ไม่ต้องเขียน rules ล่วงหน้า มี data lineage, schema tracking, freshness monitoring ครบในตัว ราคาสูง (enterprise pricing) Great Expectations เป็น open source ต้องเขียน expectations (rules) เอง ไม่มี ML-based anomaly detection แต่ฟรีและ customize ได้มาก สำหรับทีมเล็กเริ่มจาก Great Expectations สำหรับ enterprise ที่ต้อง automation เต็มรูปแบบใช้ Monte Carlo

Q: Data observability จำเป็นสำหรับ ML Pipeline ไหม?

A: จำเป็นมากสำหรับ production ML systems ปัญหา data quality เป็นสาเหตุหลักของ model failure ใน production (ประมาณ 80% ของปัญหา ML มาจาก data ไม่ใช่ model) data observability ช่วยตรวจจับ stale data ก่อน train model ด้วยข้อมูลเก่า, data drift ก่อน model accuracy ลดลง, schema changes ที่ทำให้ pipeline fail, volume anomalies ที่บ่งบอกว่า source system มีปัญหา ลงทุน data observability ก่อน model observability

Q: Open source alternatives ของ Monte Carlo มีอะไรบ้าง?

A: Great Expectations สำหรับ data validation rules (popular ที่สุด), Evidently AI สำหรับ ML model monitoring และ data drift, WhyLogs สำหรับ data profiling และ monitoring, Soda สำหรับ data quality checks ด้วย SQL, dbt tests สำหรับ data transformation testing, Elementary Data สำหรับ dbt observability ใช้หลายตัวรวมกันเพื่อครอบคลุม 5 pillars ของ data observability (freshness, volume, schema, distribution, lineage)

Q: วิธี implement data observability สำหรับ ML pipeline?

A: เริ่มจาก 3 ขั้นตอน Pre-training validation ตรวจ data quality ก่อน train (freshness, volume, nulls, distribution) ถ้า fail ให้ stop pipeline ไม่ train Feature monitoring ตรวจ feature values ว่าอยู่ในช่วงปกติ detect drift เปรียบเทียบ training data กับ production data Post-prediction monitoring ตรวจ prediction distribution, latency, error rate เปรียบเทียบกับ baseline ใช้ alerting แจ้ง team เมื่อพบปัญหา automate remediation สำหรับ cases ที่ชัดเจน

📖 บทความที่เกี่ยวข้อง

Monte Carlo Observability Observability Stackอ่านบทความ → Monte Carlo Observability Monitoring และ Alertingอ่านบทความ → Monte Carlo Observability Cache Strategy Redisอ่านบทความ → Monte Carlo Observability Multi-tenant Designอ่านบทความ → Monte Carlo Observability Cost Optimization ลดค่าใช้จ่ายอ่านบทความ →

📚 ดูบทความทั้งหมด →