ai
Dagster Pipeline Log Management ELK — จัดการ Log
Dagster + ELK Log Management

Dagster Data Pipeline Log Management ELK Elasticsearch Logstash Kibana Structured Logging Monitoring Production
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Kotlin Coroutines High Availability HA Setup
| Component | Role | Dagster Integration | Config |
|---|---|---|---|
| Dagster | Data Orchestrator | Source of Pipeline Logs | dagster.yaml |
| Filebeat | Log Shipper | อ่าน Dagster Log Files | filebeat.yml |
| Logstash | Log Processing | Parse JSON, Enrich, Filter | logstash.conf |
| Elasticsearch | Log Storage + Search | Index dagster-logs-* | elasticsearch.yml |
| Kibana | Dashboard + Search | Visualize Pipeline Health | kibana.yml |
Structured Logging
# === Dagster Structured Logging ===
import json
import logging
from datetime import datetime
from dagster import asset, OpExecutionContext, Definitions
# JSON Log Formatter
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add extra fields if present
for key in ["pipeline_name", "run_id", "asset_name",
"step_key", "duration_ms", "row_count"]:
if hasattr(record, key):
log_entry[key] = getattr(record, key)
return json.dumps(log_entry, ensure_ascii=False)
# Setup logger
def get_pipeline_logger(name: str):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
handler = logging.FileHandler("/var/log/dagster/pipeline.log")
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
return logger
# Dagster Asset with structured logging
# @asset
# def raw_orders(context: OpExecutionContext):
# logger = get_pipeline_logger("raw_orders")
# context.log.info("Starting raw_orders extraction")
#
# start = datetime.now()
# # Extract data
# df = extract_from_database("SELECT * FROM orders")
# duration = (datetime.now() - start).total_seconds() * 1000
#
# logger.info("Extraction complete",
# extra={
# "pipeline_name": "daily_orders",
# "run_id": context.run_id,
# "asset_name": "raw_orders",
# "step_key": context.step_key,
# "duration_ms": duration,
# "row_count": len(df),
# })
# return df
print("=== Structured Logging Setup ===")
print(" Format: JSON with standard fields")
print(" Fields: timestamp, level, pipeline_name, run_id, asset_name")
print(" Output: /var/log/dagster/pipeline.log")
print(" Shipper: Filebeat → Logstash → Elasticsearch")
ELK Configuration

# === ELK Stack Config ===
# Filebeat config (filebeat.yml)
# filebeat.inputs:
# - type: log
# paths:
# - /var/log/dagster/*.log
# json.keys_under_root: true
# json.add_error_key: true
# fields:
# service: dagster
# environment: production
#
# output.logstash:
# hosts: ["logstash:5044"]
# Logstash config (logstash.conf)
# input {
# beats { port => 5044 }
# }
# filter {
# if [service] == "dagster" {
# date {
# match => ["timestamp", "ISO8601"]
# target => "@timestamp"
# }
# if [level] == "ERROR" or [level] == "CRITICAL" {
# mutate { add_tag => ["alert"] }
# }
# mutate {
# add_field => {
# "[@metadata][index]" => "dagster-logs-%{+YYYY.MM.dd}"
# }
# }
# }
# }
# output {
# elasticsearch {
# hosts => ["elasticsearch:9200"]
# index => "%{[@metadata][index]}"
# }
# }
# Elasticsearch ILM Policy
# PUT _ilm/policy/dagster-logs-policy
# {
# "policy": {
# "phases": {
# "hot": { "actions": { "rollover": { "max_size": "50gb", "max_age": "7d" } } },
# "warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 } } },
# "cold": { "min_age": "30d", "actions": { "freeze": {} } },
# "delete": { "min_age": "90d", "actions": { "delete": {} } }
# }
# }
# }
from dataclasses import dataclass
@dataclass
class ELKComponent:
component: str
config_file: str
key_settings: str
purpose: str
components = [
ELKComponent("Filebeat",
"filebeat.yml",
"json.keys_under_root: true, paths: /var/log/dagster/*.log",
"อ่าน Log File ส่งไป Logstash"),
ELKComponent("Logstash",
"logstash.conf",
"date filter, tag alert, index naming",
"Parse JSON, Enrich fields, Route to ES"),
ELKComponent("Elasticsearch",
"elasticsearch.yml + ILM Policy",
"dagster-logs-* index, ILM 90 days retention",
"เก็บ Log, Full-text Search, Aggregation"),
ELKComponent("Kibana",
"kibana.yml + Dashboards",
"Index Pattern dagster-logs-*, Saved Searches",
"Dashboard, Search, Alert, Visualization"),
]
print("=== ELK Configuration ===")
for c in components:
print(f" [{c.component}] Config: {c.config_file}")
print(f" Settings: {c.key_settings}")
print(f" Purpose: {c.purpose}")
Kibana Dashboard
# === Kibana Dashboard Panels ===
@dataclass
class DashboardPanel:
panel: str
visualization: str
query: str
alert: str
panels = [
DashboardPanel("Pipeline Run Status",
"Pie Chart: Success vs Failed vs Running",
"level: (INFO AND message: 'Run completed') OR (ERROR AND message: 'Run failed')",
"Failed Rate > 10% → Alert"),
DashboardPanel("Error Rate Trend",
"Line Chart: Error count per hour",
"level: ERROR | date_histogram @timestamp interval 1h",
"> 50 errors/hr → Warning, > 200 → Critical"),
DashboardPanel("Top 10 Error Messages",
"Data Table: message, count, last_seen",
"level: ERROR | terms message.keyword size 10",
"New Error Pattern → Alert"),
DashboardPanel("Pipeline Duration",
"Bar Chart: avg duration per pipeline",
"duration_ms: * | avg duration_ms per pipeline_name",
"Duration > 2x Average → Warning"),
DashboardPanel("Asset Row Counts",
"Line Chart: row_count per asset over time",
"row_count: * | avg row_count per asset_name",
"Row count drop > 50% → Alert"),
DashboardPanel("Log Volume",
"Area Chart: log count per minute",
"* | date_histogram @timestamp interval 1m",
"Volume spike > 5x normal → Investigate"),
]
print("=== Kibana Dashboard ===")
for p in panels:
print(f" [{p.panel}] Viz: {p.visualization}")
print(f" Query: {p.query}")
print(f" Alert: {p.alert}")
เคล็ดลับ
- JSON: ใช้ JSON Structured Logging เสมอ อย่าใช้ Plain Text
- Fields: กำหนด Standard Fields run_id pipeline_name asset_name ทุก Log
- ILM: ตั้ง Index Lifecycle Management เก็บ 90 วัน ลบอัตโนมัติ
- Alert: ตั้ง Alert Error Rate Pipeline Duration Row Count Anomaly
- Correlation: ใช้ run_id เชื่อมโยง Log ข้าม Asset ใน Pipeline เดียวกัน
Dagster คืออะไร
Data Orchestrator Python Software-defined Assets Dagit Web UI Sensor Schedule Partition Resource IO Manager dbt Spark Pipeline
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Htmx Alpine.js Stream Processing
อ่านเพิ่ม: Elasticsearch คืออะไร? สอน Elasticsearch ตั้งแต่เริ่มต้น สำห · อ่านเพิ่ม: Prometheus และ Grafana คืออะไร? สอนสร้าง Monitoring Stack สำ · อ่านเพิ่ม: Python Automation คืออะไร? สอนเขียน Script อัตโนมัติ Web Scr
แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Strength แปลว่าอะไร





