SiamCafe.net Blog
Cybersecurity

Model Registry Log Management ELK

model registry log management elk
Model Registry Log Management ELK | SiamCafe Blog
2025-08-30· อ. บอม — SiamCafe.net· 10,239 คำ

Model Registry ELK

Model Registry Log Management ELK Stack MLflow Elasticsearch Logstash Kibana Filebeat Prediction Monitoring Alert Production

ComponentToolPurposeLog Type
Model RegistryMLflow / W&BVersion Stage MetadataRegister Promote Deploy
Log CollectorFilebeatCollect from ServersAll Application Logs
Log PipelineLogstashParse Filter EnrichStructured Model Events
Storage & SearchElasticsearchIndex Store QueryAll Indexed Logs
DashboardKibanaVisualize AlertDashboard Timeline Charts

Logstash Pipeline

# === Logstash Pipeline for Model Registry ===

# /etc/logstash/conf.d/model-registry.conf
# input {
#   beats {
#     port => 5044
#     tags => ["model-logs"]
#   }
# }
#
# filter {
#   if "model-logs" in [tags] {
#     # Parse MLflow event log
#     grok {
#       match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:event_type} model=%{DATA:model_name} version=%{INT:model_version} stage=%{WORD:stage} user=%{DATA:user}" }
#     }
#     # Parse prediction log
#     grok {
#       match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} PREDICT request_id=%{UUID:request_id} model=%{DATA:model_name} latency=%{NUMBER:latency_ms:float} status=%{WORD:status}" }
#     }
#     # Add metadata
#     mutate {
#       add_field => { "environment" => "%{[fields][env]}" }
#       convert => { "latency_ms" => "float" }
#       convert => { "model_version" => "integer" }
#     }
#     # Date parsing
#     date {
#       match => [ "timestamp", "ISO8601" ]
#       target => "@timestamp"
#     }
#   }
# }
#
# output {
#   elasticsearch {
#     hosts => ["elasticsearch:9200"]
#     index => "model-logs-%{+YYYY.MM.dd}"
#     ilm_enabled => true
#     ilm_policy => "model-logs-policy"
#   }
# }

from dataclasses import dataclass

@dataclass
class LogField:
    field: str
    type_es: str
    source: str
    purpose: str

fields = [
    LogField("model_name", "keyword", "MLflow / Application Log", "Filter Query ตาม Model"),
    LogField("model_version", "integer", "MLflow Event", "Track Version History"),
    LogField("stage", "keyword", "MLflow Transition Event", "Filter ตาม Stage (Staging/Production)"),
    LogField("event_type", "keyword", "MLflow / Custom", "Filter ตาม Event (Register/Promote/Predict)"),
    LogField("latency_ms", "float", "Prediction Log", "วัด Inference Latency P50 P95 P99"),
    LogField("status", "keyword", "Prediction Log", "Filter Success/Error นับ Error Rate"),
    LogField("request_id", "keyword", "Prediction Log", "Trace แต่ละ Request"),
    LogField("user", "keyword", "MLflow Event", "Track ใครทำอะไร Audit"),
    LogField("environment", "keyword", "Filebeat fields", "Filter Dev/Staging/Production"),
]

print("=== Elasticsearch Field Mapping ===")
for f in fields:
    print(f"  [{f.field}] Type: {f.type_es}")
    print(f"    Source: {f.source}")
    print(f"    Purpose: {f.purpose}")

Kibana Dashboard

# === Kibana Dashboard Panels ===

@dataclass
class KibanaPanel:
    panel: str
    query: str
    viz: str
    insight: str

panels = [
    KibanaPanel("Model Events Timeline",
        "event_type: (registered OR transitioned OR deployed) | timechart",
        "Timeline / Bar Chart events per day",
        "ดู Deployment Frequency Model Lifecycle"),
    KibanaPanel("Prediction Latency",
        "event_type: predict | percentiles(latency_ms, 50, 95, 99)",
        "Line Chart P50 P95 P99 over time per model",
        "ตรวจ Latency Regression หลัง Deploy ใหม่"),
    KibanaPanel("Error Rate per Model",
        "status: error | count / total count * 100 | per model_name",
        "Bar Chart % Error per Model",
        "หา Model ที่มี Error Rate สูง"),
    KibanaPanel("Model Version Comparison",
        "model_name: X | split by model_version | avg(latency_ms)",
        "Table comparing versions Latency Error Rate",
        "เปรียบเทียบ Performance ระหว่าง Version"),
    KibanaPanel("Throughput per Model",
        "event_type: predict | count per 1m per model_name",
        "Stacked Area Chart predictions/min per model",
        "ดู Traffic Pattern Capacity Planning"),
    KibanaPanel("Stage Transition Audit",
        "event_type: transitioned | table user model stage timestamp",
        "Data Table sorted by timestamp",
        "Audit Trail ใครเปลี่ยน Stage เมื่อไหร่"),
]

print("=== Kibana Dashboard ===")
for p in panels:
    print(f"  [{p.panel}]")
    print(f"    Query: {p.query}")
    print(f"    Viz: {p.viz}")
    print(f"    Insight: {p.insight}")

ILM & Alerting

# === Index Lifecycle Management + Alerts ===

# ILM Policy
# PUT _ilm/policy/model-logs-policy
# {
#   "policy": {
#     "phases": {
#       "hot":   { "actions": { "rollover": { "max_size": "50GB", "max_age": "1d" } } },
#       "warm":  { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } },
#       "cold":  { "min_age": "30d", "actions": { "freeze": {} } },
#       "delete": { "min_age": "365d", "actions": { "delete": {} } }
#     }
#   }
# }

@dataclass
class AlertRule:
    alert: str
    condition: str
    severity: str
    action: str

alerts = [
    AlertRule("Model Deploy Failed",
        "event_type:transitioned AND status:failed count > 0 in 5m",
        "P1 Critical",
        "PagerDuty + Slack → Check MLflow Logs Model Artifacts"),
    AlertRule("High Error Rate",
        "status:error / total > 5% in 10m per model",
        "P1 Critical",
        "PagerDuty → Rollback Model Check Input Data"),
    AlertRule("Latency Regression",
        "P99(latency_ms) > 2x baseline in 15m per model",
        "P2 Warning",
        "Slack → Check Model Size GPU Load Scale Instance"),
    AlertRule("No Predictions",
        "event_type:predict count = 0 in 30m per model (production)",
        "P2 Warning",
        "Slack → Check Serving Endpoint Health Load Balancer"),
    AlertRule("Disk Usage High",
        "Elasticsearch disk > 80%",
        "P3 Info",
        "Email → Review ILM Policy Delete Old Indices Expand Storage"),
]

print("=== Alert Rules ===")
for a in alerts:
    print(f"  [{a.alert}] Severity: {a.severity}")
    print(f"    Condition: {a.condition}")
    print(f"    Action: {a.action}")

เคล็ดลับ

Model Registry คืออะไร

ML Model Version Management MLflow W&B Stage Staging Production Metadata Metrics Lineage Artifacts Approve Rollback

ELK Stack คืออะไร

Elasticsearch Logstash Kibana Filebeat Log Management Search Index Dashboard Alert Full-text Scalable Real-time ML Anomaly

Log Pipeline ออกแบบอย่างไร

Filebeat Logstash Grok Parse Elasticsearch Index ILM Hot Warm Cold Delete keyword float date Enrich Filter Mapping

Monitoring & Alert ตั้งอย่างไร

Kibana Dashboard Timeline Latency Error Rate Throughput Stage Audit Alert Deploy Failed High Error No Predictions Disk Slack PagerDuty

สรุป

Model Registry Log Management ELK MLflow Elasticsearch Logstash Kibana Filebeat ILM Prediction Latency Error Alert Dashboard Production

📖 บทความที่เกี่ยวข้อง

Model Registry Monitoring และ Alertingอ่านบทความ → Tekton Pipeline Log Management ELKอ่านบทความ → Model Registry IoT Gatewayอ่านบทความ → Redis Cluster Log Management ELKอ่านบทความ → Azure Front Door Log Management ELKอ่านบทความ →

📚 ดูบทความทั้งหมด →