Model Registry ELK
Model Registry Log Management ELK Stack MLflow Elasticsearch Logstash Kibana Filebeat Prediction Monitoring Alert Production
| Component | Tool | Purpose | Log Type |
|---|---|---|---|
| Model Registry | MLflow / W&B | Version Stage Metadata | Register Promote Deploy |
| Log Collector | Filebeat | Collect from Servers | All Application Logs |
| Log Pipeline | Logstash | Parse Filter Enrich | Structured Model Events |
| Storage & Search | Elasticsearch | Index Store Query | All Indexed Logs |
| Dashboard | Kibana | Visualize Alert | Dashboard Timeline Charts |
Logstash Pipeline
# === Logstash Pipeline for Model Registry ===
# /etc/logstash/conf.d/model-registry.conf
# input {
# beats {
# port => 5044
# tags => ["model-logs"]
# }
# }
#
# filter {
# if "model-logs" in [tags] {
# # Parse MLflow event log
# grok {
# match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:event_type} model=%{DATA:model_name} version=%{INT:model_version} stage=%{WORD:stage} user=%{DATA:user}" }
# }
# # Parse prediction log
# grok {
# match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} PREDICT request_id=%{UUID:request_id} model=%{DATA:model_name} latency=%{NUMBER:latency_ms:float} status=%{WORD:status}" }
# }
# # Add metadata
# mutate {
# add_field => { "environment" => "%{[fields][env]}" }
# convert => { "latency_ms" => "float" }
# convert => { "model_version" => "integer" }
# }
# # Date parsing
# date {
# match => [ "timestamp", "ISO8601" ]
# target => "@timestamp"
# }
# }
# }
#
# output {
# elasticsearch {
# hosts => ["elasticsearch:9200"]
# index => "model-logs-%{+YYYY.MM.dd}"
# ilm_enabled => true
# ilm_policy => "model-logs-policy"
# }
# }
from dataclasses import dataclass
@dataclass
class LogField:
field: str
type_es: str
source: str
purpose: str
fields = [
LogField("model_name", "keyword", "MLflow / Application Log", "Filter Query ตาม Model"),
LogField("model_version", "integer", "MLflow Event", "Track Version History"),
LogField("stage", "keyword", "MLflow Transition Event", "Filter ตาม Stage (Staging/Production)"),
LogField("event_type", "keyword", "MLflow / Custom", "Filter ตาม Event (Register/Promote/Predict)"),
LogField("latency_ms", "float", "Prediction Log", "วัด Inference Latency P50 P95 P99"),
LogField("status", "keyword", "Prediction Log", "Filter Success/Error นับ Error Rate"),
LogField("request_id", "keyword", "Prediction Log", "Trace แต่ละ Request"),
LogField("user", "keyword", "MLflow Event", "Track ใครทำอะไร Audit"),
LogField("environment", "keyword", "Filebeat fields", "Filter Dev/Staging/Production"),
]
print("=== Elasticsearch Field Mapping ===")
for f in fields:
print(f" [{f.field}] Type: {f.type_es}")
print(f" Source: {f.source}")
print(f" Purpose: {f.purpose}")
Kibana Dashboard
# === Kibana Dashboard Panels ===
@dataclass
class KibanaPanel:
panel: str
query: str
viz: str
insight: str
panels = [
KibanaPanel("Model Events Timeline",
"event_type: (registered OR transitioned OR deployed) | timechart",
"Timeline / Bar Chart events per day",
"ดู Deployment Frequency Model Lifecycle"),
KibanaPanel("Prediction Latency",
"event_type: predict | percentiles(latency_ms, 50, 95, 99)",
"Line Chart P50 P95 P99 over time per model",
"ตรวจ Latency Regression หลัง Deploy ใหม่"),
KibanaPanel("Error Rate per Model",
"status: error | count / total count * 100 | per model_name",
"Bar Chart % Error per Model",
"หา Model ที่มี Error Rate สูง"),
KibanaPanel("Model Version Comparison",
"model_name: X | split by model_version | avg(latency_ms)",
"Table comparing versions Latency Error Rate",
"เปรียบเทียบ Performance ระหว่าง Version"),
KibanaPanel("Throughput per Model",
"event_type: predict | count per 1m per model_name",
"Stacked Area Chart predictions/min per model",
"ดู Traffic Pattern Capacity Planning"),
KibanaPanel("Stage Transition Audit",
"event_type: transitioned | table user model stage timestamp",
"Data Table sorted by timestamp",
"Audit Trail ใครเปลี่ยน Stage เมื่อไหร่"),
]
print("=== Kibana Dashboard ===")
for p in panels:
print(f" [{p.panel}]")
print(f" Query: {p.query}")
print(f" Viz: {p.viz}")
print(f" Insight: {p.insight}")
ILM & Alerting
# === Index Lifecycle Management + Alerts ===
# ILM Policy
# PUT _ilm/policy/model-logs-policy
# {
# "policy": {
# "phases": {
# "hot": { "actions": { "rollover": { "max_size": "50GB", "max_age": "1d" } } },
# "warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } },
# "cold": { "min_age": "30d", "actions": { "freeze": {} } },
# "delete": { "min_age": "365d", "actions": { "delete": {} } }
# }
# }
# }
@dataclass
class AlertRule:
alert: str
condition: str
severity: str
action: str
alerts = [
AlertRule("Model Deploy Failed",
"event_type:transitioned AND status:failed count > 0 in 5m",
"P1 Critical",
"PagerDuty + Slack → Check MLflow Logs Model Artifacts"),
AlertRule("High Error Rate",
"status:error / total > 5% in 10m per model",
"P1 Critical",
"PagerDuty → Rollback Model Check Input Data"),
AlertRule("Latency Regression",
"P99(latency_ms) > 2x baseline in 15m per model",
"P2 Warning",
"Slack → Check Model Size GPU Load Scale Instance"),
AlertRule("No Predictions",
"event_type:predict count = 0 in 30m per model (production)",
"P2 Warning",
"Slack → Check Serving Endpoint Health Load Balancer"),
AlertRule("Disk Usage High",
"Elasticsearch disk > 80%",
"P3 Info",
"Email → Review ILM Policy Delete Old Indices Expand Storage"),
]
print("=== Alert Rules ===")
for a in alerts:
print(f" [{a.alert}] Severity: {a.severity}")
print(f" Condition: {a.condition}")
print(f" Action: {a.action}")
เคล็ดลับ
- ILM: ตั้ง ILM Policy Hot/Warm/Cold/Delete ประหยัด Storage
- keyword: ใช้ keyword type สำหรับ Field ที่ Filter ไม่ต้อง Full-text
- request_id: ใส่ request_id ทุก Prediction Log สำหรับ Trace
- Baseline: เก็บ Performance Baseline ก่อน Deploy ใช้เปรียบเทียบ
- Audit: Log ทุก Stage Transition สำหรับ Compliance Audit
Model Registry คืออะไร
ML Model Version Management MLflow W&B Stage Staging Production Metadata Metrics Lineage Artifacts Approve Rollback
ELK Stack คืออะไร
Elasticsearch Logstash Kibana Filebeat Log Management Search Index Dashboard Alert Full-text Scalable Real-time ML Anomaly
Log Pipeline ออกแบบอย่างไร
Filebeat Logstash Grok Parse Elasticsearch Index ILM Hot Warm Cold Delete keyword float date Enrich Filter Mapping
Monitoring & Alert ตั้งอย่างไร
Kibana Dashboard Timeline Latency Error Rate Throughput Stage Audit Alert Deploy Failed High Error No Predictions Disk Slack PagerDuty
สรุป
Model Registry Log Management ELK MLflow Elasticsearch Logstash Kibana Filebeat ILM Prediction Latency Error Alert Dashboard Production
