SiamCafe · Blog
Model Registry Log Management ELK — จัดการ ML
บทความ

Model Registry Log Management ELK — จัดการ ML

เผยแพร่ 28 พฤษภาคม 2569

Model Registry ELK

Model Registry Log Management ELK Stack MLflow Elasticsearch Logstash Kibana Filebeat Prediction Monitoring Alert Production

ComponentToolPurposeLog Type
Model RegistryMLflow / W&BVersion Stage MetadataRegister Promote Deploy
Log CollectorFilebeatCollect from ServersAll Application Logs
Log PipelineLogstashParse Filter EnrichStructured Model Events
Storage & SearchElasticsearchIndex Store QueryAll Indexed Logs
DashboardKibanaVisualize AlertDashboard Timeline Charts

Logstash Pipeline

# === Logstash Pipeline for Model Registry ===

# /etc/logstash/conf.d/model-registry.conf
# input {
#   beats {
#     port => 5044
#     tags => ["model-logs"]
#   }
# }
#
# filter {
#   if "model-logs" in [tags] {
#     # Parse MLflow event log
#     grok {
#       match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:event_type} model=%{DATA:model_name} version=%{INT:model_version} stage=%{WORD:stage} user=%{DATA:user}" }
#     }
#     # Parse prediction log
#     grok {
#       match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} PREDICT request_id=%{UUID:request_id} model=%{DATA:model_name} latency=%{NUMBER:latency_ms:float} status=%{WORD:status}" }
#     }
#     # Add metadata
#     mutate {
#       add_field => { "environment" => "%{[fields][env]}" }
#       convert => { "latency_ms" => "float" }
#       convert => { "model_version" => "integer" }
#     }
#     # Date parsing
#     date {
#       match => [ "timestamp", "ISO8601" ]
#       target => "@timestamp"
#     }
#   }
# }
#
# output {
#   elasticsearch {
#     hosts => ["elasticsearch:9200"]
#     index => "model-logs-%{+YYYY.MM.dd}"
#     ilm_enabled => true
#     ilm_policy => "model-logs-policy"
#   }
# }

from dataclasses import dataclass

@dataclass
class LogField:
    field: str
    type_es: str
    source: str
    purpose: str

fields = [
    LogField("model_name", "keyword", "MLflow / Application Log", "Filter Query ตาม Model"),
    LogField("model_version", "integer", "MLflow Event", "Track Version History"),
    LogField("stage", "keyword", "MLflow Transition Event", "Filter ตาม Stage (Staging/Production)"),
    LogField("event_type", "keyword", "MLflow / Custom", "Filter ตาม Event (Register/Promote/Predict)"),
    LogField("latency_ms", "float", "Prediction Log", "วัด Inference Latency P50 P95 P99"),
    LogField("status", "keyword", "Prediction Log", "Filter Success/Error นับ Error Rate"),
    LogField("request_id", "keyword", "Prediction Log", "Trace แต่ละ Request"),
    LogField("user", "keyword", "MLflow Event", "Track ใครทำอะไร Audit"),
    LogField("environment", "keyword", "Filebeat fields", "Filter Dev/Staging/Production"),
]

print("=== Elasticsearch Field Mapping ===")
for f in fields:
    print(f"  [{f.field}] Type: {f.type_es}")
    print(f"    Source: {f.source}")
    print(f"    Purpose: {f.purpose}")

Kibana Dashboard

# === Kibana Dashboard Panels ===

@dataclass
class KibanaPanel:
    panel: str
    query: str
    viz: str
    insight: str

panels = [
    KibanaPanel("Model Events Timeline",
        "event_type: (registered OR transitioned OR deployed) | timechart",
        "Timeline / Bar Chart events per day",
        "ดู Deployment Frequency Model Lifecycle"),
    KibanaPanel("Prediction Latency",
        "event_type: predict | percentiles(latency_ms, 50, 95, 99)",
        "Line Chart P50 P95 P99 over time per model",
        "ตรวจ Latency Regression หลัง Deploy ใหม่"),
    KibanaPanel("Error Rate per Model",
        "status: error | count / total count * 100 | per model_name",
        "Bar Chart % Error per Model",
        "หา Model ที่มี Error Rate สูง"),
    KibanaPanel("Model Version Comparison",
        "model_name: X | split by model_version | avg(latency_ms)",
        "Table comparing versions Latency Error Rate",
        "เปรียบเทียบ Performance ระหว่าง Version"),
    KibanaPanel("Throughput per Model",
        "event_type: predict | count per 1m per model_name",
        "Stacked Area Chart predictions/min per model",
        "ดู Traffic Pattern Capacity Planning"),
    KibanaPanel("Stage Transition Audit",
        "event_type: transitioned | table user model stage timestamp",
        "Data Table sorted by timestamp",
        "Audit Trail ใครเปลี่ยน Stage เมื่อไหร่"),
]

print("=== Kibana Dashboard ===")
for p in panels:
    print(f"  [{p.panel}]")
    print(f"    Query: {p.query}")
    print(f"    Viz: {p.viz}")
    print(f"    Insight: {p.insight}")

ILM & Alerting

# === Index Lifecycle Management + Alerts ===

# ILM Policy
# PUT _ilm/policy/model-logs-policy
# {
#   "policy": {
#     "phases": {
#       "hot":   { "actions": { "rollover": { "max_size": "50GB", "max_age": "1d" } } },
#       "warm":  { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } },
#       "cold":  { "min_age": "30d", "actions": { "freeze": {} } },
#       "delete": { "min_age": "365d", "actions": { "delete": {} } }
#     }
#   }
# }

@dataclass
class AlertRule:
    alert: str
    condition: str
    severity: str
    action: str

alerts = [
    AlertRule("Model Deploy Failed",
        "event_type:transitioned AND status:failed count > 0 in 5m",
        "P1 Critical",
        "PagerDuty + Slack → Check MLflow Logs Model Artifacts"),
    AlertRule("High Error Rate",
        "status:error / total > 5% in 10m per model",
        "P1 Critical",
        "PagerDuty → Rollback Model Check Input Data"),
    AlertRule("Latency Regression",
        "P99(latency_ms) > 2x baseline in 15m per model",
        "P2 Warning",
        "Slack → Check Model Size GPU Load Scale Instance"),
    AlertRule("No Predictions",
        "event_type:predict count = 0 in 30m per model (production)",
        "P2 Warning",
        "Slack → Check Serving Endpoint Health Load Balancer"),
    AlertRule("Disk Usage High",
        "Elasticsearch disk > 80%",
        "P3 Info",
        "Email → Review ILM Policy Delete Old Indices Expand Storage"),
]

print("=== Alert Rules ===")
for a in alerts:
    print(f"  [{a.alert}] Severity: {a.severity}")
    print(f"    Condition: {a.condition}")
    print(f"    Action: {a.action}")

เคล็ดลับ

  • ILM: ตั้ง ILM Policy Hot/Warm/Cold/Delete ประหยัด Storage
  • keyword: ใช้ keyword type สำหรับ Field ที่ Filter ไม่ต้อง Full-text
  • request_id: ใส่ request_id ทุก Prediction Log สำหรับ Trace
  • Baseline: เก็บ Performance Baseline ก่อน Deploy ใช้เปรียบเทียบ
  • Audit: Log ทุก Stage Transition สำหรับ Compliance Audit

Model Registry คืออะไร

ML Model Version Management MLflow W&B Stage Staging Production Metadata Metrics Lineage Artifacts Approve Rollback