SigNoz Observability กับ MLOps Workflow —
SigNoz Observability Platform
SigNoz รวม Traces, Metrics, Logs ไว้ในที่เดียว ใช้ OpenTelemetry Standard รองรับ ClickHouse Backend ทดแทน Datadog New Relic ได้ Self-hosted ฟรี
MLOps Workflow ต้องการ Observability สำหรับทุกขั้นตอน Data Pipeline, Training, Serving, Monitoring ใช้ SigNoz ติดตาม Performance และหา Root Cause
SigNoz Setup และ OpenTelemetry
# === SigNoz Installation ===
# 1. Docker Compose Installation
git clone -b main https://github.com/SigNoz/signoz.git
cd signoz/deploy
docker compose -f docker/clickhouse-setup/docker-compose.yaml up -d
# เข้า SigNoz UI: http://localhost:3301
# 2. OpenTelemetry Python SDK
# pip install opentelemetry-api opentelemetry-sdk
# pip install opentelemetry-exporter-otlp
# pip install opentelemetry-instrumentation-fastapi
# pip install opentelemetry-instrumentation-requests
# 3. Python Instrumentation
# otel_config.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource
def setup_otel(service_name):
"""Setup OpenTelemetry"""
resource = Resource.create({"service.name": service_name})
# Traces
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(trace_provider)
# Metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://localhost:4317"),
export_interval_millis=10000,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
return trace.get_tracer(service_name), metrics.get_meter(service_name)
# 4. FastAPI Auto-instrumentation
# from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# from fastapi import FastAPI
#
# app = FastAPI()
# FastAPIInstrumentor.instrument_app(app)
# 5. Environment Variables (Alternative)
# export OTEL_SERVICE_NAME=ml-pipeline
# export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
# export OTEL_TRACES_EXPORTER=otlp
# export OTEL_METRICS_EXPORTER=otlp
# export OTEL_LOGS_EXPORTER=otlp
#
# opentelemetry-instrument python app.py
print("SigNoz + OpenTelemetry configured")
print(" UI: http://localhost:3301")
print(" OTLP: http://localhost:4317 (gRPC)")
print(" OTLP: http://localhost:4318 (HTTP)")
MLOps Pipeline Observability
# mlops_observability.py — MLOps Pipeline ด้วย OpenTelemetry
import time
import random
from dataclasses import dataclass, field
from typing import Dict, List
from datetime import datetime
# Simulated OTel interfaces
class SimTracer:
def start_span(self, name):
return SimSpan(name)
class SimSpan:
def __init__(self, name):
self.name = name
self.start = time.time()
self.attributes = {}
def set_attribute(self, key, value):
self.attributes[key] = value
def end(self):
duration = (time.time() - self.start) * 1000
self.attributes["duration_ms"] = f"{duration:.0f}"
def __enter__(self):
return self
def __exit__(self, *args):
self.end()
class SimMeter:
def __init__(self):
self.metrics = {}
def create_histogram(self, name, **kwargs):
self.metrics[name] = []
return SimHistogram(name, self.metrics)
def create_counter(self, name, **kwargs):
self.metrics[name] = 0
return SimCounter(name, self.metrics)
class SimHistogram:
def __init__(self, name, store):
self.name = name
self.store = store
def record(self, value, attributes=None):
self.store[self.name].append(value)
class SimCounter:
def __init__(self, name, store):
self.name = name
self.store = store
def add(self, value, attributes=None):
self.store[self.name] += value
class MLPipelineObservability:
"""MLOps Pipeline with Observability"""
def __init__(self):
self.tracer = SimTracer()
self.meter = SimMeter()
# Metrics
self.training_duration = self.meter.create_histogram(
"ml.training.duration", unit="s")
self.prediction_latency = self.meter.create_histogram(
"ml.prediction.latency", unit="ms")
self.prediction_count = self.meter.create_counter(
"ml.prediction.count")
self.data_quality_score = self.meter.create_histogram(
"ml.data.quality_score")
self.model_accuracy = self.meter.create_histogram(
"ml.model.accuracy")
def data_ingestion(self, source, rows):
"""Data Ingestion Step"""
with self.tracer.start_span("data_ingestion") as span:
span.set_attribute("data.source", source)
span.set_attribute("data.rows", rows)
time.sleep(0.01) # Simulate work
quality = random.uniform(0.85, 0.99)
self.data_quality_score.record(quality)
span.set_attribute("data.quality_score", f"{quality:.3f}")
return {"rows": rows, "quality": quality}
def feature_engineering(self, data):
"""Feature Engineering Step"""
with self.tracer.start_span("feature_engineering") as span:
span.set_attribute("features.input_rows", data["rows"])
time.sleep(0.01)
features_count = random.randint(20, 50)
span.set_attribute("features.count", features_count)
return {"features": features_count, "rows": data["rows"]}
def model_training(self, features):
"""Model Training Step"""
with self.tracer.start_span("model_training") as span:
start = time.time()
time.sleep(0.02) # Simulate training
accuracy = random.uniform(0.82, 0.95)
loss = random.uniform(0.05, 0.20)
duration = time.time() - start
self.training_duration.record(duration)
self.model_accuracy.record(accuracy)
span.set_attribute("model.accuracy", f"{accuracy:.4f}")
span.set_attribute("model.loss", f"{loss:.4f}")
span.set_attribute("model.epochs", 100)
return {"accuracy": accuracy, "loss": loss}
def model_serving(self, n_requests=100):
"""Model Serving Step"""
with self.tracer.start_span("model_serving") as span:
latencies = []
for _ in range(n_requests):
latency = random.uniform(5, 50)
self.prediction_latency.record(latency)
self.prediction_count.add(1)
latencies.append(latency)
avg_latency = sum(latencies) / len(latencies)
span.set_attribute("serving.requests", n_requests)
span.set_attribute("serving.avg_latency_ms", f"{avg_latency:.1f}")
return {"requests": n_requests, "avg_latency": avg_latency}
def run_pipeline(self):
"""Run Full ML Pipeline"""
print(f"\n{'='*55}")
print(f"MLOps Pipeline — {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*55}")
with self.tracer.start_span("ml_pipeline") as pipeline_span:
# Step 1: Data Ingestion
data = self.data_ingestion("s3://data/training", 100000)
print(f" 1. Data Ingestion: {data['rows']:,} rows, "
f"quality={data['quality']:.3f}")
# Step 2: Feature Engineering
features = self.feature_engineering(data)
print(f" 2. Features: {features['features']} features")
# Step 3: Training
result = self.model_training(features)
print(f" 3. Training: accuracy={result['accuracy']:.4f}, "
f"loss={result['loss']:.4f}")
# Step 4: Serving
serving = self.model_serving(100)
print(f" 4. Serving: {serving['requests']} requests, "
f"avg={serving['avg_latency']:.1f}ms")
pipeline_span.set_attribute("pipeline.status", "success")
# Metrics Summary
print(f"\n Metrics:")
for name, values in self.meter.metrics.items():
if isinstance(values, list) and values:
print(f" {name}: avg={sum(values)/len(values):.3f}")
elif isinstance(values, (int, float)):
print(f" {name}: {values}")
pipeline = MLPipelineObservability()
pipeline.run_pipeline()
Alerting และ Dashboards
# === SigNoz Alerting Configuration ===
# 1. Alert Rules (ตั้งใน SigNoz UI หรือ API)
alert_rules = {
"High Prediction Latency": {
"metric": "ml.prediction.latency",
"condition": "p99 > 100ms",
"for": "5m",
"severity": "warning",
"notification": ["slack", "pagerduty"],
},
"Low Model Accuracy": {
"metric": "ml.model.accuracy",
"condition": "avg < 0.80",
"for": "15m",
"severity": "critical",
"notification": ["slack", "pagerduty", "email"],
},
"Data Quality Drop": {
"metric": "ml.data.quality_score",
"condition": "avg < 0.90",
"for": "10m",
"severity": "warning",
"notification": ["slack"],
},
"Training Duration Spike": {
"metric": "ml.training.duration",
"condition": "avg > 3600s",
"for": "1m",
"severity": "warning",
"notification": ["slack"],
},
"High Error Rate": {
"metric": "ml.prediction.errors",
"condition": "rate > 5%",
"for": "5m",
"severity": "critical",
"notification": ["slack", "pagerduty"],
},
}
# 2. Dashboard Panels
dashboard_panels = [
{"title": "Pipeline Traces", "type": "trace_list",
"query": "service.name = ml-pipeline"},
{"title": "Prediction Latency P50/P95/P99", "type": "timeseries",
"metric": "ml.prediction.latency"},
{"title": "Model Accuracy Over Time", "type": "timeseries",
"metric": "ml.model.accuracy"},
{"title": "Data Quality Score", "type": "gauge",
"metric": "ml.data.quality_score"},
{"title": "Training Duration", "type": "timeseries",
"metric": "ml.training.duration"},
{"title": "Predictions per Second", "type": "timeseries",
"metric": "ml.prediction.count"},
{"title": "Error Logs", "type": "log_list",
"query": "severity = ERROR AND service = ml-pipeline"},
]
print("SigNoz Alert Rules:")
for name, rule in alert_rules.items():
print(f" [{rule['severity']:>8}] {name}: {rule['condition']}")
print(f"\nDashboard Panels ({len(dashboard_panels)}):")
for panel in dashboard_panels:
print(f" [{panel['type']:>12}] {panel['title']}")
Best Practices
- OpenTelemetry: ใช้ OTel SDK เป็น Standard ไม่ Lock-in กับ Vendor ใดๆ
- Custom Metrics: สร้าง ML-specific Metrics เช่น Accuracy, Drift, Data Quality
- Trace Context: ส่ง Trace Context ข้าม Services ติดตาม Request End-to-end
- Alerts: ตั้ง Alerts สำหรับ Model Drift, Latency Spike, Data Quality Drop
- Dashboards: สร้าง Dashboard แยกตาม Pipeline Stage Training, Serving, Data
- Log Correlation: เชื่อม Logs กับ Traces ด้วย Trace ID หา Root Cause เร็ว
SigNoz คืออะไร
Open-source Observability Platform รวม Traces Metrics Logs ในที่เดียว OpenTelemetry Standard ClickHouse Backend ทดแทน Datadog New Relic Self-hosted ฟรี Cloud Version