ai

Dagster Pipeline Log Management ELK — จัดการ Log

Dagster Pipeline Log Management ELK — จัดการ Log

Dagster + ELK Log Management

Dagster Pipeline Log Management ELK — จัดการ Log

Dagster Data Pipeline Log Management ELK Elasticsearch Logstash Kibana Structured Logging Monitoring Production

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Kotlin Coroutines High Availability HA Setup

ComponentRoleDagster IntegrationConfig
DagsterData OrchestratorSource of Pipeline Logsdagster.yaml
FilebeatLog Shipperอ่าน Dagster Log Filesfilebeat.yml
LogstashLog ProcessingParse JSON, Enrich, Filterlogstash.conf
ElasticsearchLog Storage + SearchIndex dagster-logs-*elasticsearch.yml
KibanaDashboard + SearchVisualize Pipeline Healthkibana.yml

Structured Logging

# === Dagster Structured Logging ===



import json

import logging

from datetime import datetime

from dagster import asset, OpExecutionContext, Definitions



# JSON Log Formatter

class JSONFormatter(logging.Formatter):

    def format(self, record):

        log_entry = {

            "timestamp": datetime.utcnow().isoformat() + "Z",

            "level": record.levelname,

            "logger": record.name,

            "message": record.getMessage(),

            "module": record.module,

            "function": record.funcName,

            "line": record.lineno,

        }

        # Add extra fields if present

        for key in ["pipeline_name", "run_id", "asset_name",

                     "step_key", "duration_ms", "row_count"]:

            if hasattr(record, key):

                log_entry[key] = getattr(record, key)

        return json.dumps(log_entry, ensure_ascii=False)



# Setup logger

def get_pipeline_logger(name: str):

    logger = logging.getLogger(name)

    logger.setLevel(logging.INFO)

    handler = logging.FileHandler("/var/log/dagster/pipeline.log")

    handler.setFormatter(JSONFormatter())

    logger.addHandler(handler)

    return logger



# Dagster Asset with structured logging

# @asset

# def raw_orders(context: OpExecutionContext):

#     logger = get_pipeline_logger("raw_orders")

#     context.log.info("Starting raw_orders extraction")

#     

#     start = datetime.now()

#     # Extract data

#     df = extract_from_database("SELECT * FROM orders")

#     duration = (datetime.now() - start).total_seconds() * 1000

#     

#     logger.info("Extraction complete",

#         extra={

#             "pipeline_name": "daily_orders",

#             "run_id": context.run_id,

#             "asset_name": "raw_orders",

#             "step_key": context.step_key,

#             "duration_ms": duration,

#             "row_count": len(df),

#         })

#     return df



print("=== Structured Logging Setup ===")

print("  Format: JSON with standard fields")

print("  Fields: timestamp, level, pipeline_name, run_id, asset_name")

print("  Output: /var/log/dagster/pipeline.log")

print("  Shipper: Filebeat → Logstash → Elasticsearch")

ELK Configuration

Dagster Pipeline Log Management ELK — จัดการ Log
# === ELK Stack Config ===



# Filebeat config (filebeat.yml)

# filebeat.inputs:

# - type: log

#   paths:

#     - /var/log/dagster/*.log

#   json.keys_under_root: true

#   json.add_error_key: true

#   fields:

#     service: dagster

#     environment: production

#

# output.logstash:

#   hosts: ["logstash:5044"]



# Logstash config (logstash.conf)

# input {

#   beats { port => 5044 }

# }

# filter {

#   if [service] == "dagster" {

#     date {

#       match => ["timestamp", "ISO8601"]

#       target => "@timestamp"

#     }

#     if [level] == "ERROR" or [level] == "CRITICAL" {

#       mutate { add_tag => ["alert"] }

#     }

#     mutate {

#       add_field => {

#         "[@metadata][index]" => "dagster-logs-%{+YYYY.MM.dd}"

#       }

#     }

#   }

# }

# output {

#   elasticsearch {

#     hosts => ["elasticsearch:9200"]

#     index => "%{[@metadata][index]}"

#   }

# }



# Elasticsearch ILM Policy

# PUT _ilm/policy/dagster-logs-policy

# {

#   "policy": {

#     "phases": {

#       "hot":    { "actions": { "rollover": { "max_size": "50gb", "max_age": "7d" } } },

#       "warm":   { "min_age": "7d",  "actions": { "shrink": { "number_of_shards": 1 } } },

#       "cold":   { "min_age": "30d", "actions": { "freeze": {} } },

#       "delete": { "min_age": "90d", "actions": { "delete": {} } }

#     }

#   }

# }



from dataclasses import dataclass



@dataclass

class ELKComponent:

    component: str

    config_file: str

    key_settings: str

    purpose: str



components = [

    ELKComponent("Filebeat",

        "filebeat.yml",

        "json.keys_under_root: true, paths: /var/log/dagster/*.log",

        "อ่าน Log File ส่งไป Logstash"),

    ELKComponent("Logstash",

        "logstash.conf",

        "date filter, tag alert, index naming",

        "Parse JSON, Enrich fields, Route to ES"),

    ELKComponent("Elasticsearch",

        "elasticsearch.yml + ILM Policy",

        "dagster-logs-* index, ILM 90 days retention",

        "เก็บ Log, Full-text Search, Aggregation"),

    ELKComponent("Kibana",

        "kibana.yml + Dashboards",

        "Index Pattern dagster-logs-*, Saved Searches",

        "Dashboard, Search, Alert, Visualization"),

]



print("=== ELK Configuration ===")

for c in components:

    print(f"  [{c.component}] Config: {c.config_file}")

    print(f"    Settings: {c.key_settings}")

    print(f"    Purpose: {c.purpose}")

Kibana Dashboard

# === Kibana Dashboard Panels ===



@dataclass

class DashboardPanel:

    panel: str

    visualization: str

    query: str

    alert: str



panels = [

    DashboardPanel("Pipeline Run Status",

        "Pie Chart: Success vs Failed vs Running",

        "level: (INFO AND message: 'Run completed') OR (ERROR AND message: 'Run failed')",

        "Failed Rate > 10% → Alert"),

    DashboardPanel("Error Rate Trend",

        "Line Chart: Error count per hour",

        "level: ERROR | date_histogram @timestamp interval 1h",

        "> 50 errors/hr → Warning, > 200 → Critical"),

    DashboardPanel("Top 10 Error Messages",

        "Data Table: message, count, last_seen",

        "level: ERROR | terms message.keyword size 10",

        "New Error Pattern → Alert"),

    DashboardPanel("Pipeline Duration",

        "Bar Chart: avg duration per pipeline",

        "duration_ms: * | avg duration_ms per pipeline_name",

        "Duration > 2x Average → Warning"),

    DashboardPanel("Asset Row Counts",

        "Line Chart: row_count per asset over time",

        "row_count: * | avg row_count per asset_name",

        "Row count drop > 50% → Alert"),

    DashboardPanel("Log Volume",

        "Area Chart: log count per minute",

        "* | date_histogram @timestamp interval 1m",

        "Volume spike > 5x normal → Investigate"),

]



print("=== Kibana Dashboard ===")

for p in panels:

    print(f"  [{p.panel}] Viz: {p.visualization}")

    print(f"    Query: {p.query}")

    print(f"    Alert: {p.alert}")

เคล็ดลับ

  • JSON: ใช้ JSON Structured Logging เสมอ อย่าใช้ Plain Text
  • Fields: กำหนด Standard Fields run_id pipeline_name asset_name ทุก Log
  • ILM: ตั้ง Index Lifecycle Management เก็บ 90 วัน ลบอัตโนมัติ
  • Alert: ตั้ง Alert Error Rate Pipeline Duration Row Count Anomaly
  • Correlation: ใช้ run_id เชื่อมโยง Log ข้าม Asset ใน Pipeline เดียวกัน

Dagster คืออะไร

Data Orchestrator Python Software-defined Assets Dagit Web UI Sensor Schedule Partition Resource IO Manager dbt Spark Pipeline

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Htmx Alpine.js Stream Processing

อ่านเพิ่ม: Elasticsearch คืออะไร? สอน Elasticsearch ตั้งแต่เริ่มต้น สำห · อ่านเพิ่ม: Prometheus และ Grafana คืออะไร? สอนสร้าง Monitoring Stack สำ · อ่านเพิ่ม: Python Automation คืออะไร? สอนเขียน Script อัตโนมัติ Web Scr

แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Strength แปลว่าอะไร

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง