it

Dagster Pipeline Pub Sub Architecture — สร้าง

Dagster Pipeline Pub Sub Architecture — สร้าง

Dagster + Pub/Sub

Dagster Pipeline Pub Sub Architecture — สร้าง

Dagster Pipeline Pub Sub Architecture Data Orchestrator Asset Pipeline Sensor Schedule GCP Integration Event-driven Monitoring Production

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: GCP BigQuery ML Container Orchestration

ComponentRoleTechnologyTrigger
Pub/Sub TopicEvent SourceGoogle Cloud Pub/SubPublisher sends message
Dagster SensorEvent Listenerdagster-gcp sensorPoll subscription
Asset PipelineData ProcessingDagster @assetSensor triggers run
IO ManagerData StorageBigQuery / GCS / S3Asset materialization
Dagit UIMonitoringWeb DashboardAlways running

Pub/Sub Sensor Setup

# === Dagster Pub/Sub Sensor ===



# from dagster import sensor, RunRequest, SensorEvaluationContext

# from dagster_gcp import GCSResource

# from google.cloud import pubsub_v1

# import json

#

# @sensor(job=process_events_job, minimum_interval_seconds=30)

# def pubsub_sensor(context: SensorEvaluationContext):

#     subscriber = pubsub_v1.SubscriberClient()

#     subscription_path = "projects/my-project/subscriptions/events-sub"

#

#     response = subscriber.pull(

#         request={"subscription": subscription_path, "max_messages": 10}

#     )

#

#     for msg in response.received_messages:

#         data = json.loads(msg.message.data.decode("utf-8"))

#         yield RunRequest(

#             run_key=msg.message.message_id,

#             run_config={

#                 "ops": {

#                     "process_event": {

#                         "config": {

#                             "event_type": data["type"],

#                             "payload": json.dumps(data["payload"]),

#                             "timestamp": data["timestamp"],

#                         }

#                     }

#                 }

#             },

#         )

#         subscriber.acknowledge(

#             request={"subscription": subscription_path,

#                      "ack_ids": [msg.ack_id]}

#         )



from dataclasses import dataclass



@dataclass

class SensorConfig:

    setting: str

    value: str

    purpose: str

    production_tip: str



configs = [

    SensorConfig("minimum_interval_seconds",

        "30",

        "Poll Pub/Sub ทุก 30 วินาที",

        "ลดเป็น 10s ถ้าต้องการ Latency ต่ำ เพิ่มถ้า Cost สูง"),

    SensorConfig("max_messages",

        "10",

        "ดึง Message สูงสุด 10 ต่อ Poll",

        "เพิ่มถ้า Message เข้ามาเร็ว ลดถ้า Processing หนัก"),

    SensorConfig("run_key",

        "message_id",

        "ป้องกัน Duplicate Run จาก Message เดียวกัน",

        "ใช้ message_id ที่ Unique"),

    SensorConfig("ack_after_process",

        "True",

        "Acknowledge หลังสร้าง RunRequest",

        "ถ้า Run Fail Message ไม่ถูก Re-process ใช้ DLQ แทน"),

    SensorConfig("dead_letter_topic",

        "events-dlq",

        "Message ที่ Fail ส่งไป Dead Letter Queue",

        "ตั้ง Max Delivery Attempts 5 ก่อนส่ง DLQ"),

]



print("=== Sensor Configuration ===")

for c in configs:

    print(f"  [{c.setting}] = {c.value}")

    print(f"    Purpose: {c.purpose}")

    print(f"    Tip: {c.production_tip}")

Asset Pipeline

Dagster Pipeline Pub Sub Architecture — สร้าง
# === Dagster Asset Pipeline ===



# from dagster import asset, AssetIn, Output, MetadataValue

# from dagster import FreshnessPolicy, AutoMaterializePolicy

#

# @asset(

#     group_name="events",

#     io_manager_key="bigquery_io_manager",

#     freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),

# )

# def raw_events(context) -> pd.DataFrame:

#     """Ingest raw events from Pub/Sub"""

#     events = fetch_events_from_staging()

#     context.log.info(f"Ingested {len(events)} events")

#     context.add_output_metadata({

#         "row_count": MetadataValue.int(len(events)),

#         "schema": MetadataValue.json(events.dtypes.to_dict()),

#     })

#     return events

#

# @asset(

#     ins={"raw_events": AssetIn()},

#     group_name="events",

# )

# def cleaned_events(context, raw_events: pd.DataFrame) -> pd.DataFrame:

#     """Clean and validate events"""

#     cleaned = raw_events.dropna(subset=["user_id", "event_type"])

#     cleaned = cleaned[cleaned["timestamp"] > "2024-01-01"]

#     context.log.info(f"Cleaned: {len(raw_events)} → {len(cleaned)}")

#     return cleaned

#

# @asset(

#     ins={"cleaned_events": AssetIn()},

#     group_name="analytics",

# )

# def user_activity_summary(context, cleaned_events: pd.DataFrame) -> pd.DataFrame:

#     """Aggregate user activity"""

#     summary = cleaned_events.groupby("user_id").agg(

#         event_count=("event_type", "count"),

#         last_active=("timestamp", "max"),

#     ).reset_index()

#     return summary



@dataclass

class AssetDefinition:

    name: str

    group: str

    depends_on: str

    io_manager: str

    freshness: str

    description: str



assets = [

    AssetDefinition("raw_events",

        "events", "Pub/Sub Sensor",

        "bigquery_io_manager",

        "60 minutes",

        "Raw events จาก Pub/Sub ingest ไป BigQuery"),

    AssetDefinition("cleaned_events",

        "events", "raw_events",

        "bigquery_io_manager",

        "90 minutes",

        "ทำความสะอาดข้อมูล ลบ Null Validate Schema"),

    AssetDefinition("user_activity_summary",

        "analytics", "cleaned_events",

        "bigquery_io_manager",

        "120 minutes",

        "สรุป Activity ต่อ User สำหรับ Dashboard"),

    AssetDefinition("daily_report",

        "reporting", "user_activity_summary",

        "gcs_io_manager",

        "24 hours",

        "รายงานประจำวัน Export เป็น CSV ไป GCS"),

]



print("=== Asset Pipeline ===")

for a in assets:

    print(f"\n  [{a.name}] Group: {a.group}")

    print(f"    Depends: {a.depends_on} | IO: {a.io_manager}")

    print(f"    Freshness: {a.freshness}")

    print(f"    Desc: {a.description}")

Monitoring & Alerting

# === Dagster Monitoring ===



@dataclass

class DagsterAlert:

    alert: str

    condition: str

    channel: str

    action: str



alerts = [

    DagsterAlert("Pipeline Run Failed",

        "Run status = FAILURE",

        "Slack #data-alerts + PagerDuty",

        "Check logs ดู Error Fix แล้ว Re-run"),

    DagsterAlert("Asset Stale",

        "Asset ไม่ Materialize ภายใน Freshness Policy",

        "Slack #data-alerts",

        "ตรวจ Sensor ทำงานไหม Pub/Sub มี Message ไหม"),

    DagsterAlert("Sensor Not Ticking",

        "Sensor ไม่ Tick ใน 5 นาที",

        "Slack #infra-alerts + PagerDuty",

        "ตรวจ Dagster Daemon Process Restart"),

    DagsterAlert("Run Duration Anomaly",

        "Run Duration > 2x Average",

        "Slack #data-alerts",

        "ตรวจ Data Volume Query Performance"),

    DagsterAlert("Pub/Sub Message Backlog",

        "Unacked Messages > 1000",

        "Slack + GCP Monitoring",

        "Scale Dagster Workers เพิ่ม Parallelism"),

    DagsterAlert("Data Quality Check Failed",

        "Asset Check ไม่ผ่าน (Null Rate > 5%)",

        "Slack #data-quality",

        "ตรวจ Source Data Fix Upstream"),

]



print("=== Alerts ===")

for a in alerts:

    print(f"  [{a.alert}]")

    print(f"    Condition: {a.condition}")

    print(f"    Channel: {a.channel}")

    print(f"    Action: {a.action}")

เคล็ดลับ

  • Sensor: ใช้ run_key ป้องกัน Duplicate Run จาก Message เดียวกัน
  • DLQ: ตั้ง Dead Letter Queue สำหรับ Message ที่ Fail
  • Freshness: ตั้ง FreshnessPolicy ทุก Asset ตรวจ Stale อัตโนมัติ
  • Partition: ใช้ Daily Partition สำหรับ Time Series Data
  • IO Manager: ใช้ BigQuery IO Manager สำหรับ GCP Ecosystem

Dagster คืออะไร

Data Orchestrator Python Software-defined Assets Dagit UI Sensor Schedule Partition IO Manager Type System GCP AWS Integration Open Source

แนะนำเพิ่มเติม — XM Signal

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Nebula Overlay Network DevOps Culture — คู่มือฉบับสมบูรณ์ 2026

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ REST API Design Performance Tuning เพิ่มความเร็ว — คู่มือฉบับสมบูรณ์ 2026

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง