Cybersecurity

Dagster Pipeline Log Management ELK

dagster pipeline log management elk
Dagster Pipeline Log Management ELK | SiamCafe Blog
2026-05-19· อ. บอม — SiamCafe.net· 10,677 คำ

Dagster + ELK Log Management

Dagster Data Pipeline Log Management ELK Elasticsearch Logstash Kibana Structured Logging Monitoring Production

ComponentRoleDagster IntegrationConfig
DagsterData OrchestratorSource of Pipeline Logsdagster.yaml
FilebeatLog Shipperอ่าน Dagster Log Filesfilebeat.yml
LogstashLog ProcessingParse JSON, Enrich, Filterlogstash.conf
ElasticsearchLog Storage + SearchIndex dagster-logs-*elasticsearch.yml
KibanaDashboard + SearchVisualize Pipeline Healthkibana.yml

Structured Logging

# === Dagster Structured Logging ===

import json
import logging
from datetime import datetime
from dagster import asset, OpExecutionContext, Definitions

# JSON Log Formatter
class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        # Add extra fields if present
        for key in ["pipeline_name", "run_id", "asset_name",
                     "step_key", "duration_ms", "row_count"]:
            if hasattr(record, key):
                log_entry[key] = getattr(record, key)
        return json.dumps(log_entry, ensure_ascii=False)

# Setup logger
def get_pipeline_logger(name: str):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    handler = logging.FileHandler("/var/log/dagster/pipeline.log")
    handler.setFormatter(JSONFormatter())
    logger.addHandler(handler)
    return logger

# Dagster Asset with structured logging
# @asset
# def raw_orders(context: OpExecutionContext):
#     logger = get_pipeline_logger("raw_orders")
#     context.log.info("Starting raw_orders extraction")
#     
#     start = datetime.now()
#     # Extract data
#     df = extract_from_database("SELECT * FROM orders")
#     duration = (datetime.now() - start).total_seconds() * 1000
#     
#     logger.info("Extraction complete",
#         extra={
#             "pipeline_name": "daily_orders",
#             "run_id": context.run_id,
#             "asset_name": "raw_orders",
#             "step_key": context.step_key,
#             "duration_ms": duration,
#             "row_count": len(df),
#         })
#     return df

print("=== Structured Logging Setup ===")
print("  Format: JSON with standard fields")
print("  Fields: timestamp, level, pipeline_name, run_id, asset_name")
print("  Output: /var/log/dagster/pipeline.log")
print("  Shipper: Filebeat → Logstash → Elasticsearch")

ELK Configuration

# === ELK Stack Config ===

# Filebeat config (filebeat.yml)
# filebeat.inputs:
# - type: log
#   paths:
#     - /var/log/dagster/*.log
#   json.keys_under_root: true
#   json.add_error_key: true
#   fields:
#     service: dagster
#     environment: production
#
# output.logstash:
#   hosts: ["logstash:5044"]

# Logstash config (logstash.conf)
# input {
#   beats { port => 5044 }
# }
# filter {
#   if [service] == "dagster" {
#     date {
#       match => ["timestamp", "ISO8601"]
#       target => "@timestamp"
#     }
#     if [level] == "ERROR" or [level] == "CRITICAL" {
#       mutate { add_tag => ["alert"] }
#     }
#     mutate {
#       add_field => {
#         "[@metadata][index]" => "dagster-logs-%{+YYYY.MM.dd}"
#       }
#     }
#   }
# }
# output {
#   elasticsearch {
#     hosts => ["elasticsearch:9200"]
#     index => "%{[@metadata][index]}"
#   }
# }

# Elasticsearch ILM Policy
# PUT _ilm/policy/dagster-logs-policy
# {
#   "policy": {
#     "phases": {
#       "hot":    { "actions": { "rollover": { "max_size": "50gb", "max_age": "7d" } } },
#       "warm":   { "min_age": "7d",  "actions": { "shrink": { "number_of_shards": 1 } } },
#       "cold":   { "min_age": "30d", "actions": { "freeze": {} } },
#       "delete": { "min_age": "90d", "actions": { "delete": {} } }
#     }
#   }
# }

from dataclasses import dataclass

@dataclass
class ELKComponent:
    component: str
    config_file: str
    key_settings: str
    purpose: str

components = [
    ELKComponent("Filebeat",
        "filebeat.yml",
        "json.keys_under_root: true, paths: /var/log/dagster/*.log",
        "อ่าน Log File ส่งไป Logstash"),
    ELKComponent("Logstash",
        "logstash.conf",
        "date filter, tag alert, index naming",
        "Parse JSON, Enrich fields, Route to ES"),
    ELKComponent("Elasticsearch",
        "elasticsearch.yml + ILM Policy",
        "dagster-logs-* index, ILM 90 days retention",
        "เก็บ Log, Full-text Search, Aggregation"),
    ELKComponent("Kibana",
        "kibana.yml + Dashboards",
        "Index Pattern dagster-logs-*, Saved Searches",
        "Dashboard, Search, Alert, Visualization"),
]

print("=== ELK Configuration ===")
for c in components:
    print(f"  [{c.component}] Config: {c.config_file}")
    print(f"    Settings: {c.key_settings}")
    print(f"    Purpose: {c.purpose}")

Kibana Dashboard

# === Kibana Dashboard Panels ===

@dataclass
class DashboardPanel:
    panel: str
    visualization: str
    query: str
    alert: str

panels = [
    DashboardPanel("Pipeline Run Status",
        "Pie Chart: Success vs Failed vs Running",
        "level: (INFO AND message: 'Run completed') OR (ERROR AND message: 'Run failed')",
        "Failed Rate > 10% → Alert"),
    DashboardPanel("Error Rate Trend",
        "Line Chart: Error count per hour",
        "level: ERROR | date_histogram @timestamp interval 1h",
        "> 50 errors/hr → Warning, > 200 → Critical"),
    DashboardPanel("Top 10 Error Messages",
        "Data Table: message, count, last_seen",
        "level: ERROR | terms message.keyword size 10",
        "New Error Pattern → Alert"),
    DashboardPanel("Pipeline Duration",
        "Bar Chart: avg duration per pipeline",
        "duration_ms: * | avg duration_ms per pipeline_name",
        "Duration > 2x Average → Warning"),
    DashboardPanel("Asset Row Counts",
        "Line Chart: row_count per asset over time",
        "row_count: * | avg row_count per asset_name",
        "Row count drop > 50% → Alert"),
    DashboardPanel("Log Volume",
        "Area Chart: log count per minute",
        "* | date_histogram @timestamp interval 1m",
        "Volume spike > 5x normal → Investigate"),
]

print("=== Kibana Dashboard ===")
for p in panels:
    print(f"  [{p.panel}] Viz: {p.visualization}")
    print(f"    Query: {p.query}")
    print(f"    Alert: {p.alert}")

เคล็ดลับ

Dagster คืออะไร

Data Orchestrator Python Software-defined Assets Dagit Web UI Sensor Schedule Partition Resource IO Manager dbt Spark Pipeline

ทำไมต้องส่ง Log ไป ELK

Centralized Log Full-text Search Kibana Dashboard Alert Error Rate Pipeline Health Retention Policy Correlation วิเคราะห์ข้าม Pipeline Production

ตั้งค่า Structured Logging อย่างไร

Python logging JSON Formatter timestamp level pipeline_name run_id asset_name duration_ms row_count Filebeat Logstash INFO Production

Kibana Dashboard ทำอย่างไร

Index Pattern dagster-logs-* Run Status Error Rate Trend Top Errors Duration Row Counts Volume Alert Saved Search Lens Discover

สรุป

Dagster Pipeline ELK Elasticsearch Logstash Kibana Structured Logging JSON Filebeat Dashboard Alert ILM Retention Production Monitoring

📖 บทความที่เกี่ยวข้อง

Dagster Pipeline Progressive Deliveryอ่านบทความ → Tekton Pipeline Log Management ELKอ่านบทความ → Redis Cluster Log Management ELKอ่านบทความ → Dagster Pipeline Career Development ITอ่านบทความ → Azure Front Door Log Management ELKอ่านบทความ →

📚 ดูบทความทั้งหมด →