Technology

Dagster Pipeline Pub Sub Architecture

dagster pipeline pub sub architecture
Dagster Pipeline Pub Sub Architecture | SiamCafe Blog
2026-06-01· อ. บอม — SiamCafe.net· 8,319 คำ

Dagster + Pub/Sub

Dagster Pipeline Pub Sub Architecture Data Orchestrator Asset Pipeline Sensor Schedule GCP Integration Event-driven Monitoring Production

ComponentRoleTechnologyTrigger
Pub/Sub TopicEvent SourceGoogle Cloud Pub/SubPublisher sends message
Dagster SensorEvent Listenerdagster-gcp sensorPoll subscription
Asset PipelineData ProcessingDagster @assetSensor triggers run
IO ManagerData StorageBigQuery / GCS / S3Asset materialization
Dagit UIMonitoringWeb DashboardAlways running

Pub/Sub Sensor Setup

# === Dagster Pub/Sub Sensor ===

# from dagster import sensor, RunRequest, SensorEvaluationContext
# from dagster_gcp import GCSResource
# from google.cloud import pubsub_v1
# import json
#
# @sensor(job=process_events_job, minimum_interval_seconds=30)
# def pubsub_sensor(context: SensorEvaluationContext):
#     subscriber = pubsub_v1.SubscriberClient()
#     subscription_path = "projects/my-project/subscriptions/events-sub"
#
#     response = subscriber.pull(
#         request={"subscription": subscription_path, "max_messages": 10}
#     )
#
#     for msg in response.received_messages:
#         data = json.loads(msg.message.data.decode("utf-8"))
#         yield RunRequest(
#             run_key=msg.message.message_id,
#             run_config={
#                 "ops": {
#                     "process_event": {
#                         "config": {
#                             "event_type": data["type"],
#                             "payload": json.dumps(data["payload"]),
#                             "timestamp": data["timestamp"],
#                         }
#                     }
#                 }
#             },
#         )
#         subscriber.acknowledge(
#             request={"subscription": subscription_path,
#                      "ack_ids": [msg.ack_id]}
#         )

from dataclasses import dataclass

@dataclass
class SensorConfig:
    setting: str
    value: str
    purpose: str
    production_tip: str

configs = [
    SensorConfig("minimum_interval_seconds",
        "30",
        "Poll Pub/Sub ทุก 30 วินาที",
        "ลดเป็น 10s ถ้าต้องการ Latency ต่ำ เพิ่มถ้า Cost สูง"),
    SensorConfig("max_messages",
        "10",
        "ดึง Message สูงสุด 10 ต่อ Poll",
        "เพิ่มถ้า Message เข้ามาเร็ว ลดถ้า Processing หนัก"),
    SensorConfig("run_key",
        "message_id",
        "ป้องกัน Duplicate Run จาก Message เดียวกัน",
        "ใช้ message_id ที่ Unique"),
    SensorConfig("ack_after_process",
        "True",
        "Acknowledge หลังสร้าง RunRequest",
        "ถ้า Run Fail Message ไม่ถูก Re-process ใช้ DLQ แทน"),
    SensorConfig("dead_letter_topic",
        "events-dlq",
        "Message ที่ Fail ส่งไป Dead Letter Queue",
        "ตั้ง Max Delivery Attempts 5 ก่อนส่ง DLQ"),
]

print("=== Sensor Configuration ===")
for c in configs:
    print(f"  [{c.setting}] = {c.value}")
    print(f"    Purpose: {c.purpose}")
    print(f"    Tip: {c.production_tip}")

Asset Pipeline

# === Dagster Asset Pipeline ===

# from dagster import asset, AssetIn, Output, MetadataValue
# from dagster import FreshnessPolicy, AutoMaterializePolicy
#
# @asset(
#     group_name="events",
#     io_manager_key="bigquery_io_manager",
#     freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
# )
# def raw_events(context) -> pd.DataFrame:
#     """Ingest raw events from Pub/Sub"""
#     events = fetch_events_from_staging()
#     context.log.info(f"Ingested {len(events)} events")
#     context.add_output_metadata({
#         "row_count": MetadataValue.int(len(events)),
#         "schema": MetadataValue.json(events.dtypes.to_dict()),
#     })
#     return events
#
# @asset(
#     ins={"raw_events": AssetIn()},
#     group_name="events",
# )
# def cleaned_events(context, raw_events: pd.DataFrame) -> pd.DataFrame:
#     """Clean and validate events"""
#     cleaned = raw_events.dropna(subset=["user_id", "event_type"])
#     cleaned = cleaned[cleaned["timestamp"] > "2024-01-01"]
#     context.log.info(f"Cleaned: {len(raw_events)} → {len(cleaned)}")
#     return cleaned
#
# @asset(
#     ins={"cleaned_events": AssetIn()},
#     group_name="analytics",
# )
# def user_activity_summary(context, cleaned_events: pd.DataFrame) -> pd.DataFrame:
#     """Aggregate user activity"""
#     summary = cleaned_events.groupby("user_id").agg(
#         event_count=("event_type", "count"),
#         last_active=("timestamp", "max"),
#     ).reset_index()
#     return summary

@dataclass
class AssetDefinition:
    name: str
    group: str
    depends_on: str
    io_manager: str
    freshness: str
    description: str

assets = [
    AssetDefinition("raw_events",
        "events", "Pub/Sub Sensor",
        "bigquery_io_manager",
        "60 minutes",
        "Raw events จาก Pub/Sub ingest ไป BigQuery"),
    AssetDefinition("cleaned_events",
        "events", "raw_events",
        "bigquery_io_manager",
        "90 minutes",
        "ทำความสะอาดข้อมูล ลบ Null Validate Schema"),
    AssetDefinition("user_activity_summary",
        "analytics", "cleaned_events",
        "bigquery_io_manager",
        "120 minutes",
        "สรุป Activity ต่อ User สำหรับ Dashboard"),
    AssetDefinition("daily_report",
        "reporting", "user_activity_summary",
        "gcs_io_manager",
        "24 hours",
        "รายงานประจำวัน Export เป็น CSV ไป GCS"),
]

print("=== Asset Pipeline ===")
for a in assets:
    print(f"\n  [{a.name}] Group: {a.group}")
    print(f"    Depends: {a.depends_on} | IO: {a.io_manager}")
    print(f"    Freshness: {a.freshness}")
    print(f"    Desc: {a.description}")

Monitoring & Alerting

# === Dagster Monitoring ===

@dataclass
class DagsterAlert:
    alert: str
    condition: str
    channel: str
    action: str

alerts = [
    DagsterAlert("Pipeline Run Failed",
        "Run status = FAILURE",
        "Slack #data-alerts + PagerDuty",
        "Check logs ดู Error Fix แล้ว Re-run"),
    DagsterAlert("Asset Stale",
        "Asset ไม่ Materialize ภายใน Freshness Policy",
        "Slack #data-alerts",
        "ตรวจ Sensor ทำงานไหม Pub/Sub มี Message ไหม"),
    DagsterAlert("Sensor Not Ticking",
        "Sensor ไม่ Tick ใน 5 นาที",
        "Slack #infra-alerts + PagerDuty",
        "ตรวจ Dagster Daemon Process Restart"),
    DagsterAlert("Run Duration Anomaly",
        "Run Duration > 2x Average",
        "Slack #data-alerts",
        "ตรวจ Data Volume Query Performance"),
    DagsterAlert("Pub/Sub Message Backlog",
        "Unacked Messages > 1000",
        "Slack + GCP Monitoring",
        "Scale Dagster Workers เพิ่ม Parallelism"),
    DagsterAlert("Data Quality Check Failed",
        "Asset Check ไม่ผ่าน (Null Rate > 5%)",
        "Slack #data-quality",
        "ตรวจ Source Data Fix Upstream"),
]

print("=== Alerts ===")
for a in alerts:
    print(f"  [{a.alert}]")
    print(f"    Condition: {a.condition}")
    print(f"    Channel: {a.channel}")
    print(f"    Action: {a.action}")

เคล็ดลับ

Dagster คืออะไร

Data Orchestrator Python Software-defined Assets Dagit UI Sensor Schedule Partition IO Manager Type System GCP AWS Integration Open Source

Pub/Sub Integration ทำอย่างไร

Sensor Poll Subscription Message Trigger RunRequest run_key Acknowledge DLQ Dead Letter dagster-gcp 30 วินาที max_messages Retry

Asset Pipeline ออกแบบอย่างไร

@asset Decorator Dependencies DAG Materialization IO Manager BigQuery GCS FreshnessPolicy Asset Group Partition Daily Check Quality

Monitor อย่างไร

Dagit UI Run History Alerting Slack PagerDuty Prometheus Metrics Sensor Tick Freshness Stale Data Quality Check Logging ELK Loki

สรุป

Dagster Pipeline Pub Sub Architecture Asset Pipeline Sensor Event-driven GCP BigQuery IO Manager FreshnessPolicy Monitoring Alerting Production

📖 บทความที่เกี่ยวข้อง

Redis Pub Sub SSL TLS Certificateอ่านบทความ → Solid.js Signals Pub Sub Architectureอ่านบทความ → Redis Pub Sub Troubleshooting แก้ปัญหาอ่านบทความ → MySQL Replication Pub Sub Architectureอ่านบทความ → Dagster Pipeline Career Development ITอ่านบทความ →

📚 ดูบทความทั้งหมด →