Model Registry Log Management ELK — จัดการ ML
Model Registry ELK
Model Registry Log Management ELK Stack MLflow Elasticsearch Logstash Kibana Filebeat Prediction Monitoring Alert Production
| Component | Tool | Purpose | Log Type |
|---|---|---|---|
| Model Registry | MLflow / W&B | Version Stage Metadata | Register Promote Deploy |
| Log Collector | Filebeat | Collect from Servers | All Application Logs |
| Log Pipeline | Logstash | Parse Filter Enrich | Structured Model Events |
| Storage & Search | Elasticsearch | Index Store Query | All Indexed Logs |
| Dashboard | Kibana | Visualize Alert | Dashboard Timeline Charts |
Logstash Pipeline
# === Logstash Pipeline for Model Registry ===
# /etc/logstash/conf.d/model-registry.conf
# input {
# beats {
# port => 5044
# tags => ["model-logs"]
# }
# }
#
# filter {
# if "model-logs" in [tags] {
# # Parse MLflow event log
# grok {
# match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:event_type} model=%{DATA:model_name} version=%{INT:model_version} stage=%{WORD:stage} user=%{DATA:user}" }
# }
# # Parse prediction log
# grok {
# match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} PREDICT request_id=%{UUID:request_id} model=%{DATA:model_name} latency=%{NUMBER:latency_ms:float} status=%{WORD:status}" }
# }
# # Add metadata
# mutate {
# add_field => { "environment" => "%{[fields][env]}" }
# convert => { "latency_ms" => "float" }
# convert => { "model_version" => "integer" }
# }
# # Date parsing
# date {
# match => [ "timestamp", "ISO8601" ]
# target => "@timestamp"
# }
# }
# }
#
# output {
# elasticsearch {
# hosts => ["elasticsearch:9200"]
# index => "model-logs-%{+YYYY.MM.dd}"
# ilm_enabled => true
# ilm_policy => "model-logs-policy"
# }
# }
from dataclasses import dataclass
@dataclass
class LogField:
field: str
type_es: str
source: str
purpose: str
fields = [
LogField("model_name", "keyword", "MLflow / Application Log", "Filter Query ตาม Model"),
LogField("model_version", "integer", "MLflow Event", "Track Version History"),
LogField("stage", "keyword", "MLflow Transition Event", "Filter ตาม Stage (Staging/Production)"),
LogField("event_type", "keyword", "MLflow / Custom", "Filter ตาม Event (Register/Promote/Predict)"),
LogField("latency_ms", "float", "Prediction Log", "วัด Inference Latency P50 P95 P99"),
LogField("status", "keyword", "Prediction Log", "Filter Success/Error นับ Error Rate"),
LogField("request_id", "keyword", "Prediction Log", "Trace แต่ละ Request"),
LogField("user", "keyword", "MLflow Event", "Track ใครทำอะไร Audit"),
LogField("environment", "keyword", "Filebeat fields", "Filter Dev/Staging/Production"),
]
print("=== Elasticsearch Field Mapping ===")
for f in fields:
print(f" [{f.field}] Type: {f.type_es}")
print(f" Source: {f.source}")
print(f" Purpose: {f.purpose}")
Kibana Dashboard
# === Kibana Dashboard Panels ===
@dataclass
class KibanaPanel:
panel: str
query: str
viz: str
insight: str
panels = [
KibanaPanel("Model Events Timeline",
"event_type: (registered OR transitioned OR deployed) | timechart",
"Timeline / Bar Chart events per day",
"ดู Deployment Frequency Model Lifecycle"),
KibanaPanel("Prediction Latency",
"event_type: predict | percentiles(latency_ms, 50, 95, 99)",
"Line Chart P50 P95 P99 over time per model",
"ตรวจ Latency Regression หลัง Deploy ใหม่"),
KibanaPanel("Error Rate per Model",
"status: error | count / total count * 100 | per model_name",
"Bar Chart % Error per Model",
"หา Model ที่มี Error Rate สูง"),
KibanaPanel("Model Version Comparison",
"model_name: X | split by model_version | avg(latency_ms)",
"Table comparing versions Latency Error Rate",
"เปรียบเทียบ Performance ระหว่าง Version"),
KibanaPanel("Throughput per Model",
"event_type: predict | count per 1m per model_name",
"Stacked Area Chart predictions/min per model",
"ดู Traffic Pattern Capacity Planning"),
KibanaPanel("Stage Transition Audit",
"event_type: transitioned | table user model stage timestamp",
"Data Table sorted by timestamp",
"Audit Trail ใครเปลี่ยน Stage เมื่อไหร่"),
]
print("=== Kibana Dashboard ===")
for p in panels:
print(f" [{p.panel}]")
print(f" Query: {p.query}")
print(f" Viz: {p.viz}")
print(f" Insight: {p.insight}")
ILM & Alerting
# === Index Lifecycle Management + Alerts ===
# ILM Policy
# PUT _ilm/policy/model-logs-policy
# {
# "policy": {
# "phases": {
# "hot": { "actions": { "rollover": { "max_size": "50GB", "max_age": "1d" } } },
# "warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } },
# "cold": { "min_age": "30d", "actions": { "freeze": {} } },
# "delete": { "min_age": "365d", "actions": { "delete": {} } }
# }
# }
# }
@dataclass
class AlertRule:
alert: str
condition: str
severity: str
action: str
alerts = [
AlertRule("Model Deploy Failed",
"event_type:transitioned AND status:failed count > 0 in 5m",
"P1 Critical",
"PagerDuty + Slack → Check MLflow Logs Model Artifacts"),
AlertRule("High Error Rate",
"status:error / total > 5% in 10m per model",
"P1 Critical",
"PagerDuty → Rollback Model Check Input Data"),
AlertRule("Latency Regression",
"P99(latency_ms) > 2x baseline in 15m per model",
"P2 Warning",
"Slack → Check Model Size GPU Load Scale Instance"),
AlertRule("No Predictions",
"event_type:predict count = 0 in 30m per model (production)",
"P2 Warning",
"Slack → Check Serving Endpoint Health Load Balancer"),
AlertRule("Disk Usage High",
"Elasticsearch disk > 80%",
"P3 Info",
"Email → Review ILM Policy Delete Old Indices Expand Storage"),
]
print("=== Alert Rules ===")
for a in alerts:
print(f" [{a.alert}] Severity: {a.severity}")
print(f" Condition: {a.condition}")
print(f" Action: {a.action}")
เคล็ดลับ
- ILM: ตั้ง ILM Policy Hot/Warm/Cold/Delete ประหยัด Storage
- keyword: ใช้ keyword type สำหรับ Field ที่ Filter ไม่ต้อง Full-text
- request_id: ใส่ request_id ทุก Prediction Log สำหรับ Trace
- Baseline: เก็บ Performance Baseline ก่อน Deploy ใช้เปรียบเทียบ
- Audit: Log ทุก Stage Transition สำหรับ Compliance Audit
Model Registry คืออะไร
ML Model Version Management MLflow W&B Stage Staging Production Metadata Metrics Lineage Artifacts Approve Rollback