SigNoz Observability Platform
SigNoz รวม Traces, Metrics, Logs ไว้ในที่เดียว ใช้ OpenTelemetry Standard รองรับ ClickHouse Backend ทดแทน Datadog New Relic ได้ Self-hosted ฟรี
MLOps Workflow ต้องการ Observability สำหรับทุกขั้นตอน Data Pipeline, Training, Serving, Monitoring ใช้ SigNoz ติดตาม Performance และหา Root Cause
SigNoz Setup และ OpenTelemetry
# === SigNoz Installation ===
# 1. Docker Compose Installation
git clone -b main https://github.com/SigNoz/signoz.git
cd signoz/deploy
docker compose -f docker/clickhouse-setup/docker-compose.yaml up -d
# เข้า SigNoz UI: http://localhost:3301
# 2. OpenTelemetry Python SDK
# pip install opentelemetry-api opentelemetry-sdk
# pip install opentelemetry-exporter-otlp
# pip install opentelemetry-instrumentation-fastapi
# pip install opentelemetry-instrumentation-requests
# 3. Python Instrumentation
# otel_config.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource
def setup_otel(service_name):
"""Setup OpenTelemetry"""
resource = Resource.create({"service.name": service_name})
# Traces
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(trace_provider)
# Metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://localhost:4317"),
export_interval_millis=10000,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
return trace.get_tracer(service_name), metrics.get_meter(service_name)
# 4. FastAPI Auto-instrumentation
# from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# from fastapi import FastAPI
#
# app = FastAPI()
# FastAPIInstrumentor.instrument_app(app)
# 5. Environment Variables (Alternative)
# export OTEL_SERVICE_NAME=ml-pipeline
# export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
# export OTEL_TRACES_EXPORTER=otlp
# export OTEL_METRICS_EXPORTER=otlp
# export OTEL_LOGS_EXPORTER=otlp
#
# opentelemetry-instrument python app.py
print("SigNoz + OpenTelemetry configured")
print(" UI: http://localhost:3301")
print(" OTLP: http://localhost:4317 (gRPC)")
print(" OTLP: http://localhost:4318 (HTTP)")
MLOps Pipeline Observability
# mlops_observability.py — MLOps Pipeline ด้วย OpenTelemetry
import time
import random
from dataclasses import dataclass, field
from typing import Dict, List
from datetime import datetime
# Simulated OTel interfaces
class SimTracer:
def start_span(self, name):
return SimSpan(name)
class SimSpan:
def __init__(self, name):
self.name = name
self.start = time.time()
self.attributes = {}
def set_attribute(self, key, value):
self.attributes[key] = value
def end(self):
duration = (time.time() - self.start) * 1000
self.attributes["duration_ms"] = f"{duration:.0f}"
def __enter__(self):
return self
def __exit__(self, *args):
self.end()
class SimMeter:
def __init__(self):
self.metrics = {}
def create_histogram(self, name, **kwargs):
self.metrics[name] = []
return SimHistogram(name, self.metrics)
def create_counter(self, name, **kwargs):
self.metrics[name] = 0
return SimCounter(name, self.metrics)
class SimHistogram:
def __init__(self, name, store):
self.name = name
self.store = store
def record(self, value, attributes=None):
self.store[self.name].append(value)
class SimCounter:
def __init__(self, name, store):
self.name = name
self.store = store
def add(self, value, attributes=None):
self.store[self.name] += value
class MLPipelineObservability:
"""MLOps Pipeline with Observability"""
def __init__(self):
self.tracer = SimTracer()
self.meter = SimMeter()
# Metrics
self.training_duration = self.meter.create_histogram(
"ml.training.duration", unit="s")
self.prediction_latency = self.meter.create_histogram(
"ml.prediction.latency", unit="ms")
self.prediction_count = self.meter.create_counter(
"ml.prediction.count")
self.data_quality_score = self.meter.create_histogram(
"ml.data.quality_score")
self.model_accuracy = self.meter.create_histogram(
"ml.model.accuracy")
def data_ingestion(self, source, rows):
"""Data Ingestion Step"""
with self.tracer.start_span("data_ingestion") as span:
span.set_attribute("data.source", source)
span.set_attribute("data.rows", rows)
time.sleep(0.01) # Simulate work
quality = random.uniform(0.85, 0.99)
self.data_quality_score.record(quality)
span.set_attribute("data.quality_score", f"{quality:.3f}")
return {"rows": rows, "quality": quality}
def feature_engineering(self, data):
"""Feature Engineering Step"""
with self.tracer.start_span("feature_engineering") as span:
span.set_attribute("features.input_rows", data["rows"])
time.sleep(0.01)
features_count = random.randint(20, 50)
span.set_attribute("features.count", features_count)
return {"features": features_count, "rows": data["rows"]}
def model_training(self, features):
"""Model Training Step"""
with self.tracer.start_span("model_training") as span:
start = time.time()
time.sleep(0.02) # Simulate training
accuracy = random.uniform(0.82, 0.95)
loss = random.uniform(0.05, 0.20)
duration = time.time() - start
self.training_duration.record(duration)
self.model_accuracy.record(accuracy)
span.set_attribute("model.accuracy", f"{accuracy:.4f}")
span.set_attribute("model.loss", f"{loss:.4f}")
span.set_attribute("model.epochs", 100)
return {"accuracy": accuracy, "loss": loss}
def model_serving(self, n_requests=100):
"""Model Serving Step"""
with self.tracer.start_span("model_serving") as span:
latencies = []
for _ in range(n_requests):
latency = random.uniform(5, 50)
self.prediction_latency.record(latency)
self.prediction_count.add(1)
latencies.append(latency)
avg_latency = sum(latencies) / len(latencies)
span.set_attribute("serving.requests", n_requests)
span.set_attribute("serving.avg_latency_ms", f"{avg_latency:.1f}")
return {"requests": n_requests, "avg_latency": avg_latency}
def run_pipeline(self):
"""Run Full ML Pipeline"""
print(f"\n{'='*55}")
print(f"MLOps Pipeline — {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*55}")
with self.tracer.start_span("ml_pipeline") as pipeline_span:
# Step 1: Data Ingestion
data = self.data_ingestion("s3://data/training", 100000)
print(f" 1. Data Ingestion: {data['rows']:,} rows, "
f"quality={data['quality']:.3f}")
# Step 2: Feature Engineering
features = self.feature_engineering(data)
print(f" 2. Features: {features['features']} features")
# Step 3: Training
result = self.model_training(features)
print(f" 3. Training: accuracy={result['accuracy']:.4f}, "
f"loss={result['loss']:.4f}")
# Step 4: Serving
serving = self.model_serving(100)
print(f" 4. Serving: {serving['requests']} requests, "
f"avg={serving['avg_latency']:.1f}ms")
pipeline_span.set_attribute("pipeline.status", "success")
# Metrics Summary
print(f"\n Metrics:")
for name, values in self.meter.metrics.items():
if isinstance(values, list) and values:
print(f" {name}: avg={sum(values)/len(values):.3f}")
elif isinstance(values, (int, float)):
print(f" {name}: {values}")
pipeline = MLPipelineObservability()
pipeline.run_pipeline()
Alerting และ Dashboards
# === SigNoz Alerting Configuration ===
# 1. Alert Rules (ตั้งใน SigNoz UI หรือ API)
alert_rules = {
"High Prediction Latency": {
"metric": "ml.prediction.latency",
"condition": "p99 > 100ms",
"for": "5m",
"severity": "warning",
"notification": ["slack", "pagerduty"],
},
"Low Model Accuracy": {
"metric": "ml.model.accuracy",
"condition": "avg < 0.80",
"for": "15m",
"severity": "critical",
"notification": ["slack", "pagerduty", "email"],
},
"Data Quality Drop": {
"metric": "ml.data.quality_score",
"condition": "avg < 0.90",
"for": "10m",
"severity": "warning",
"notification": ["slack"],
},
"Training Duration Spike": {
"metric": "ml.training.duration",
"condition": "avg > 3600s",
"for": "1m",
"severity": "warning",
"notification": ["slack"],
},
"High Error Rate": {
"metric": "ml.prediction.errors",
"condition": "rate > 5%",
"for": "5m",
"severity": "critical",
"notification": ["slack", "pagerduty"],
},
}
# 2. Dashboard Panels
dashboard_panels = [
{"title": "Pipeline Traces", "type": "trace_list",
"query": "service.name = ml-pipeline"},
{"title": "Prediction Latency P50/P95/P99", "type": "timeseries",
"metric": "ml.prediction.latency"},
{"title": "Model Accuracy Over Time", "type": "timeseries",
"metric": "ml.model.accuracy"},
{"title": "Data Quality Score", "type": "gauge",
"metric": "ml.data.quality_score"},
{"title": "Training Duration", "type": "timeseries",
"metric": "ml.training.duration"},
{"title": "Predictions per Second", "type": "timeseries",
"metric": "ml.prediction.count"},
{"title": "Error Logs", "type": "log_list",
"query": "severity = ERROR AND service = ml-pipeline"},
]
print("SigNoz Alert Rules:")
for name, rule in alert_rules.items():
print(f" [{rule['severity']:>8}] {name}: {rule['condition']}")
print(f"\nDashboard Panels ({len(dashboard_panels)}):")
for panel in dashboard_panels:
print(f" [{panel['type']:>12}] {panel['title']}")
Best Practices
- OpenTelemetry: ใช้ OTel SDK เป็น Standard ไม่ Lock-in กับ Vendor ใดๆ
- Custom Metrics: สร้าง ML-specific Metrics เช่น Accuracy, Drift, Data Quality
- Trace Context: ส่ง Trace Context ข้าม Services ติดตาม Request End-to-end
- Alerts: ตั้ง Alerts สำหรับ Model Drift, Latency Spike, Data Quality Drop
- Dashboards: สร้าง Dashboard แยกตาม Pipeline Stage Training, Serving, Data
- Log Correlation: เชื่อม Logs กับ Traces ด้วย Trace ID หา Root Cause เร็ว
SigNoz คืออะไร
Open-source Observability Platform รวม Traces Metrics Logs ในที่เดียว OpenTelemetry Standard ClickHouse Backend ทดแทน Datadog New Relic Self-hosted ฟรี Cloud Version
Observability ต่างจาก Monitoring อย่างไร
Monitoring ตรวจ Known Unknowns CPU Disk Observability หา Unknown Unknowns Traces ติดตาม Request Metrics วัด Performance Logs รายละเอียด ใช้ร่วมกันหา Root Cause
OpenTelemetry คืออะไร
Open Standard Observability Data Collection รวม Traces Metrics Logs Framework เดียว หลายภาษา Python Java Go Node.js ส่ง Backend ใดก็ได้ CNCF Project นิยมมาก
วิธี Monitor ML Pipeline ทำอย่างไร
OTel Instrument Training Pipeline Training Duration Loss Accuracy Custom Metrics Data Quality Model Drift Prediction Latency Traces Feature Store Model Server Alerts Metrics ผิดปกติ
สรุป
SigNoz ร่วมกับ MLOps Workflow ให้ Observability ครบทุกขั้นตอน OpenTelemetry Standard ไม่ Lock-in Custom Metrics สำหรับ ML Traces ติดตาม Pipeline End-to-end Alerts สำหรับ Drift Latency Quality Dashboards แยกตาม Stage Log Correlation หา Root Cause
