SiamCafe · Blog
SigNoz Observability กับ MLOps Workflow —
บทความ

SigNoz Observability กับ MLOps Workflow —

เผยแพร่ 28 พฤษภาคม 2569

SigNoz Observability Platform

SigNoz รวม Traces, Metrics, Logs ไว้ในที่เดียว ใช้ OpenTelemetry Standard รองรับ ClickHouse Backend ทดแทน Datadog New Relic ได้ Self-hosted ฟรี

MLOps Workflow ต้องการ Observability สำหรับทุกขั้นตอน Data Pipeline, Training, Serving, Monitoring ใช้ SigNoz ติดตาม Performance และหา Root Cause

SigNoz Setup และ OpenTelemetry

# === SigNoz Installation ===

# 1. Docker Compose Installation
git clone -b main https://github.com/SigNoz/signoz.git
cd signoz/deploy
docker compose -f docker/clickhouse-setup/docker-compose.yaml up -d

# เข้า SigNoz UI: http://localhost:3301

# 2. OpenTelemetry Python SDK
# pip install opentelemetry-api opentelemetry-sdk
# pip install opentelemetry-exporter-otlp
# pip install opentelemetry-instrumentation-fastapi
# pip install opentelemetry-instrumentation-requests

# 3. Python Instrumentation
# otel_config.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource

def setup_otel(service_name):
    """Setup OpenTelemetry"""
    resource = Resource.create({"service.name": service_name})

    # Traces
    trace_provider = TracerProvider(resource=resource)
    trace_provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
    )
    trace.set_tracer_provider(trace_provider)

    # Metrics
    metric_reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(endpoint="http://localhost:4317"),
        export_interval_millis=10000,
    )
    meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
    metrics.set_meter_provider(meter_provider)

    return trace.get_tracer(service_name), metrics.get_meter(service_name)

# 4. FastAPI Auto-instrumentation
# from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# from fastapi import FastAPI
#
# app = FastAPI()
# FastAPIInstrumentor.instrument_app(app)

# 5. Environment Variables (Alternative)
# export OTEL_SERVICE_NAME=ml-pipeline
# export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
# export OTEL_TRACES_EXPORTER=otlp
# export OTEL_METRICS_EXPORTER=otlp
# export OTEL_LOGS_EXPORTER=otlp
#
# opentelemetry-instrument python app.py

print("SigNoz + OpenTelemetry configured")
print("  UI: http://localhost:3301")
print("  OTLP: http://localhost:4317 (gRPC)")
print("  OTLP: http://localhost:4318 (HTTP)")

MLOps Pipeline Observability

# mlops_observability.py — MLOps Pipeline ด้วย OpenTelemetry
import time
import random
from dataclasses import dataclass, field
from typing import Dict, List
from datetime import datetime

# Simulated OTel interfaces
class SimTracer:
    def start_span(self, name):
        return SimSpan(name)

class SimSpan:
    def __init__(self, name):
        self.name = name
        self.start = time.time()
        self.attributes = {}

    def set_attribute(self, key, value):
        self.attributes[key] = value

    def end(self):
        duration = (time.time() - self.start) * 1000
        self.attributes["duration_ms"] = f"{duration:.0f}"

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.end()

class SimMeter:
    def __init__(self):
        self.metrics = {}

    def create_histogram(self, name, **kwargs):
        self.metrics[name] = []
        return SimHistogram(name, self.metrics)

    def create_counter(self, name, **kwargs):
        self.metrics[name] = 0
        return SimCounter(name, self.metrics)

class SimHistogram:
    def __init__(self, name, store):
        self.name = name
        self.store = store

    def record(self, value, attributes=None):
        self.store[self.name].append(value)

class SimCounter:
    def __init__(self, name, store):
        self.name = name
        self.store = store

    def add(self, value, attributes=None):
        self.store[self.name] += value

class MLPipelineObservability:
    """MLOps Pipeline with Observability"""

    def __init__(self):
        self.tracer = SimTracer()
        self.meter = SimMeter()

        # Metrics
        self.training_duration = self.meter.create_histogram(
            "ml.training.duration", unit="s")
        self.prediction_latency = self.meter.create_histogram(
            "ml.prediction.latency", unit="ms")
        self.prediction_count = self.meter.create_counter(
            "ml.prediction.count")
        self.data_quality_score = self.meter.create_histogram(
            "ml.data.quality_score")
        self.model_accuracy = self.meter.create_histogram(
            "ml.model.accuracy")

    def data_ingestion(self, source, rows):
        """Data Ingestion Step"""
        with self.tracer.start_span("data_ingestion") as span:
            span.set_attribute("data.source", source)
            span.set_attribute("data.rows", rows)

            time.sleep(0.01)  # Simulate work
            quality = random.uniform(0.85, 0.99)
            self.data_quality_score.record(quality)

            span.set_attribute("data.quality_score", f"{quality:.3f}")
            return {"rows": rows, "quality": quality}

    def feature_engineering(self, data):
        """Feature Engineering Step"""
        with self.tracer.start_span("feature_engineering") as span:
            span.set_attribute("features.input_rows", data["rows"])

            time.sleep(0.01)
            features_count = random.randint(20, 50)

            span.set_attribute("features.count", features_count)
            return {"features": features_count, "rows": data["rows"]}

    def model_training(self, features):
        """Model Training Step"""
        with self.tracer.start_span("model_training") as span:
            start = time.time()

            time.sleep(0.02)  # Simulate training
            accuracy = random.uniform(0.82, 0.95)
            loss = random.uniform(0.05, 0.20)

            duration = time.time() - start
            self.training_duration.record(duration)
            self.model_accuracy.record(accuracy)

            span.set_attribute("model.accuracy", f"{accuracy:.4f}")
            span.set_attribute("model.loss", f"{loss:.4f}")
            span.set_attribute("model.epochs", 100)
            return {"accuracy": accuracy, "loss": loss}

    def model_serving(self, n_requests=100):
        """Model Serving Step"""
        with self.tracer.start_span("model_serving") as span:
            latencies = []
            for _ in range(n_requests):
                latency = random.uniform(5, 50)
                self.prediction_latency.record(latency)
                self.prediction_count.add(1)
                latencies.append(latency)

            avg_latency = sum(latencies) / len(latencies)
            span.set_attribute("serving.requests", n_requests)
            span.set_attribute("serving.avg_latency_ms", f"{avg_latency:.1f}")
            return {"requests": n_requests, "avg_latency": avg_latency}

    def run_pipeline(self):
        """Run Full ML Pipeline"""
        print(f"\n{'='*55}")
        print(f"MLOps Pipeline — {datetime.now().strftime('%H:%M:%S')}")
        print(f"{'='*55}")

        with self.tracer.start_span("ml_pipeline") as pipeline_span:
            # Step 1: Data Ingestion
            data = self.data_ingestion("s3://data/training", 100000)
            print(f"  1. Data Ingestion: {data['rows']:,} rows, "
                  f"quality={data['quality']:.3f}")

            # Step 2: Feature Engineering
            features = self.feature_engineering(data)
            print(f"  2. Features: {features['features']} features")

            # Step 3: Training
            result = self.model_training(features)
            print(f"  3. Training: accuracy={result['accuracy']:.4f}, "
                  f"loss={result['loss']:.4f}")

            # Step 4: Serving
            serving = self.model_serving(100)
            print(f"  4. Serving: {serving['requests']} requests, "
                  f"avg={serving['avg_latency']:.1f}ms")

            pipeline_span.set_attribute("pipeline.status", "success")

        # Metrics Summary
        print(f"\n  Metrics:")
        for name, values in self.meter.metrics.items():
            if isinstance(values, list) and values:
                print(f"    {name}: avg={sum(values)/len(values):.3f}")
            elif isinstance(values, (int, float)):
                print(f"    {name}: {values}")

pipeline = MLPipelineObservability()
pipeline.run_pipeline()

Alerting และ Dashboards

# === SigNoz Alerting Configuration ===

# 1. Alert Rules (ตั้งใน SigNoz UI หรือ API)
alert_rules = {
    "High Prediction Latency": {
        "metric": "ml.prediction.latency",
        "condition": "p99 > 100ms",
        "for": "5m",
        "severity": "warning",
        "notification": ["slack", "pagerduty"],
    },
    "Low Model Accuracy": {
        "metric": "ml.model.accuracy",
        "condition": "avg < 0.80",
        "for": "15m",
        "severity": "critical",
        "notification": ["slack", "pagerduty", "email"],
    },
    "Data Quality Drop": {
        "metric": "ml.data.quality_score",
        "condition": "avg < 0.90",
        "for": "10m",
        "severity": "warning",
        "notification": ["slack"],
    },
    "Training Duration Spike": {
        "metric": "ml.training.duration",
        "condition": "avg > 3600s",
        "for": "1m",
        "severity": "warning",
        "notification": ["slack"],
    },
    "High Error Rate": {
        "metric": "ml.prediction.errors",
        "condition": "rate > 5%",
        "for": "5m",
        "severity": "critical",
        "notification": ["slack", "pagerduty"],
    },
}

# 2. Dashboard Panels
dashboard_panels = [
    {"title": "Pipeline Traces", "type": "trace_list",
     "query": "service.name = ml-pipeline"},
    {"title": "Prediction Latency P50/P95/P99", "type": "timeseries",
     "metric": "ml.prediction.latency"},
    {"title": "Model Accuracy Over Time", "type": "timeseries",
     "metric": "ml.model.accuracy"},
    {"title": "Data Quality Score", "type": "gauge",
     "metric": "ml.data.quality_score"},
    {"title": "Training Duration", "type": "timeseries",
     "metric": "ml.training.duration"},
    {"title": "Predictions per Second", "type": "timeseries",
     "metric": "ml.prediction.count"},
    {"title": "Error Logs", "type": "log_list",
     "query": "severity = ERROR AND service = ml-pipeline"},
]

print("SigNoz Alert Rules:")
for name, rule in alert_rules.items():
    print(f"  [{rule['severity']:>8}] {name}: {rule['condition']}")

print(f"\nDashboard Panels ({len(dashboard_panels)}):")
for panel in dashboard_panels:
    print(f"  [{panel['type']:>12}] {panel['title']}")

Best Practices

  • OpenTelemetry: ใช้ OTel SDK เป็น Standard ไม่ Lock-in กับ Vendor ใดๆ
  • Custom Metrics: สร้าง ML-specific Metrics เช่น Accuracy, Drift, Data Quality
  • Trace Context: ส่ง Trace Context ข้าม Services ติดตาม Request End-to-end
  • Alerts: ตั้ง Alerts สำหรับ Model Drift, Latency Spike, Data Quality Drop
  • Dashboards: สร้าง Dashboard แยกตาม Pipeline Stage Training, Serving, Data
  • Log Correlation: เชื่อม Logs กับ Traces ด้วย Trace ID หา Root Cause เร็ว

SigNoz คืออะไร

Open-source Observability Platform รวม Traces Metrics Logs ในที่เดียว OpenTelemetry Standard ClickHouse Backend ทดแทน Datadog New Relic Self-hosted ฟรี Cloud Version