Dagster + ELK Log Management
Dagster Data Pipeline Log Management ELK Elasticsearch Logstash Kibana Structured Logging Monitoring Production
| Component | Role | Dagster Integration | Config |
|---|---|---|---|
| Dagster | Data Orchestrator | Source of Pipeline Logs | dagster.yaml |
| Filebeat | Log Shipper | อ่าน Dagster Log Files | filebeat.yml |
| Logstash | Log Processing | Parse JSON, Enrich, Filter | logstash.conf |
| Elasticsearch | Log Storage + Search | Index dagster-logs-* | elasticsearch.yml |
| Kibana | Dashboard + Search | Visualize Pipeline Health | kibana.yml |
Structured Logging
# === Dagster Structured Logging ===
import json
import logging
from datetime import datetime
from dagster import asset, OpExecutionContext, Definitions
# JSON Log Formatter
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add extra fields if present
for key in ["pipeline_name", "run_id", "asset_name",
"step_key", "duration_ms", "row_count"]:
if hasattr(record, key):
log_entry[key] = getattr(record, key)
return json.dumps(log_entry, ensure_ascii=False)
# Setup logger
def get_pipeline_logger(name: str):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
handler = logging.FileHandler("/var/log/dagster/pipeline.log")
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
return logger
# Dagster Asset with structured logging
# @asset
# def raw_orders(context: OpExecutionContext):
# logger = get_pipeline_logger("raw_orders")
# context.log.info("Starting raw_orders extraction")
#
# start = datetime.now()
# # Extract data
# df = extract_from_database("SELECT * FROM orders")
# duration = (datetime.now() - start).total_seconds() * 1000
#
# logger.info("Extraction complete",
# extra={
# "pipeline_name": "daily_orders",
# "run_id": context.run_id,
# "asset_name": "raw_orders",
# "step_key": context.step_key,
# "duration_ms": duration,
# "row_count": len(df),
# })
# return df
print("=== Structured Logging Setup ===")
print(" Format: JSON with standard fields")
print(" Fields: timestamp, level, pipeline_name, run_id, asset_name")
print(" Output: /var/log/dagster/pipeline.log")
print(" Shipper: Filebeat → Logstash → Elasticsearch")
ELK Configuration
# === ELK Stack Config ===
# Filebeat config (filebeat.yml)
# filebeat.inputs:
# - type: log
# paths:
# - /var/log/dagster/*.log
# json.keys_under_root: true
# json.add_error_key: true
# fields:
# service: dagster
# environment: production
#
# output.logstash:
# hosts: ["logstash:5044"]
# Logstash config (logstash.conf)
# input {
# beats { port => 5044 }
# }
# filter {
# if [service] == "dagster" {
# date {
# match => ["timestamp", "ISO8601"]
# target => "@timestamp"
# }
# if [level] == "ERROR" or [level] == "CRITICAL" {
# mutate { add_tag => ["alert"] }
# }
# mutate {
# add_field => {
# "[@metadata][index]" => "dagster-logs-%{+YYYY.MM.dd}"
# }
# }
# }
# }
# output {
# elasticsearch {
# hosts => ["elasticsearch:9200"]
# index => "%{[@metadata][index]}"
# }
# }
# Elasticsearch ILM Policy
# PUT _ilm/policy/dagster-logs-policy
# {
# "policy": {
# "phases": {
# "hot": { "actions": { "rollover": { "max_size": "50gb", "max_age": "7d" } } },
# "warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 } } },
# "cold": { "min_age": "30d", "actions": { "freeze": {} } },
# "delete": { "min_age": "90d", "actions": { "delete": {} } }
# }
# }
# }
from dataclasses import dataclass
@dataclass
class ELKComponent:
component: str
config_file: str
key_settings: str
purpose: str
components = [
ELKComponent("Filebeat",
"filebeat.yml",
"json.keys_under_root: true, paths: /var/log/dagster/*.log",
"อ่าน Log File ส่งไป Logstash"),
ELKComponent("Logstash",
"logstash.conf",
"date filter, tag alert, index naming",
"Parse JSON, Enrich fields, Route to ES"),
ELKComponent("Elasticsearch",
"elasticsearch.yml + ILM Policy",
"dagster-logs-* index, ILM 90 days retention",
"เก็บ Log, Full-text Search, Aggregation"),
ELKComponent("Kibana",
"kibana.yml + Dashboards",
"Index Pattern dagster-logs-*, Saved Searches",
"Dashboard, Search, Alert, Visualization"),
]
print("=== ELK Configuration ===")
for c in components:
print(f" [{c.component}] Config: {c.config_file}")
print(f" Settings: {c.key_settings}")
print(f" Purpose: {c.purpose}")
Kibana Dashboard
# === Kibana Dashboard Panels ===
@dataclass
class DashboardPanel:
panel: str
visualization: str
query: str
alert: str
panels = [
DashboardPanel("Pipeline Run Status",
"Pie Chart: Success vs Failed vs Running",
"level: (INFO AND message: 'Run completed') OR (ERROR AND message: 'Run failed')",
"Failed Rate > 10% → Alert"),
DashboardPanel("Error Rate Trend",
"Line Chart: Error count per hour",
"level: ERROR | date_histogram @timestamp interval 1h",
"> 50 errors/hr → Warning, > 200 → Critical"),
DashboardPanel("Top 10 Error Messages",
"Data Table: message, count, last_seen",
"level: ERROR | terms message.keyword size 10",
"New Error Pattern → Alert"),
DashboardPanel("Pipeline Duration",
"Bar Chart: avg duration per pipeline",
"duration_ms: * | avg duration_ms per pipeline_name",
"Duration > 2x Average → Warning"),
DashboardPanel("Asset Row Counts",
"Line Chart: row_count per asset over time",
"row_count: * | avg row_count per asset_name",
"Row count drop > 50% → Alert"),
DashboardPanel("Log Volume",
"Area Chart: log count per minute",
"* | date_histogram @timestamp interval 1m",
"Volume spike > 5x normal → Investigate"),
]
print("=== Kibana Dashboard ===")
for p in panels:
print(f" [{p.panel}] Viz: {p.visualization}")
print(f" Query: {p.query}")
print(f" Alert: {p.alert}")
เคล็ดลับ
- JSON: ใช้ JSON Structured Logging เสมอ อย่าใช้ Plain Text
- Fields: กำหนด Standard Fields run_id pipeline_name asset_name ทุก Log
- ILM: ตั้ง Index Lifecycle Management เก็บ 90 วัน ลบอัตโนมัติ
- Alert: ตั้ง Alert Error Rate Pipeline Duration Row Count Anomaly
- Correlation: ใช้ run_id เชื่อมโยง Log ข้าม Asset ใน Pipeline เดียวกัน
Dagster คืออะไร
Data Orchestrator Python Software-defined Assets Dagit Web UI Sensor Schedule Partition Resource IO Manager dbt Spark Pipeline
ทำไมต้องส่ง Log ไป ELK
Centralized Log Full-text Search Kibana Dashboard Alert Error Rate Pipeline Health Retention Policy Correlation วิเคราะห์ข้าม Pipeline Production
ตั้งค่า Structured Logging อย่างไร
Python logging JSON Formatter timestamp level pipeline_name run_id asset_name duration_ms row_count Filebeat Logstash INFO Production
Kibana Dashboard ทำอย่างไร
Index Pattern dagster-logs-* Run Status Error Rate Trend Top Errors Duration Row Counts Volume Alert Saved Search Lens Discover
สรุป
Dagster Pipeline ELK Elasticsearch Logstash Kibana Structured Logging JSON Filebeat Dashboard Alert ILM Retention Production Monitoring
