Dagster + Pub/Sub

Dagster Pipeline Pub Sub Architecture Data Orchestrator Asset Pipeline Sensor Schedule GCP Integration Event-driven Monitoring Production

ComponentRoleTechnologyTrigger
Pub/Sub TopicEvent SourceGoogle Cloud Pub/SubPublisher sends message
Dagster SensorEvent Listenerdagster-gcp sensorPoll subscription
Asset PipelineData ProcessingDagster @assetSensor triggers run
IO ManagerData StorageBigQuery / GCS / S3Asset materialization
Dagit UIMonitoringWeb DashboardAlways running

Pub/Sub Sensor Setup

# === Dagster Pub/Sub Sensor ===

# from dagster import sensor, RunRequest, SensorEvaluationContext
# from dagster_gcp import GCSResource
# from google.cloud import pubsub_v1
# import json
#
# @sensor(job=process_events_job, minimum_interval_seconds=30)
# def pubsub_sensor(context: SensorEvaluationContext):
#     subscriber = pubsub_v1.SubscriberClient()
#     subscription_path = "projects/my-project/subscriptions/events-sub"
#
#     response = subscriber.pull(
#         request={"subscription": subscription_path, "max_messages": 10}
#     )
#
#     for msg in response.received_messages:
#         data = json.loads(msg.message.data.decode("utf-8"))
#         yield RunRequest(
#             run_key=msg.message.message_id,
#             run_config={
#                 "ops": {
#                     "process_event": {
#                         "config": {
#                             "event_type": data["type"],
#                             "payload": json.dumps(data["payload"]),
#                             "timestamp": data["timestamp"],
#                         }
#                     }
#                 }
#             },
#         )
#         subscriber.acknowledge(
#             request={"subscription": subscription_path,
#                      "ack_ids": [msg.ack_id]}
#         )

from dataclasses import dataclass

@dataclass
class SensorConfig:
    setting: str
    value: str
    purpose: str
    production_tip: str

configs = [
    SensorConfig("minimum_interval_seconds",
        "30",
        "Poll Pub/Sub ทุก 30 วินาที",
        "ลดเป็น 10s ถ้าต้องการ Latency ต่ำ เพิ่มถ้า Cost สูง"),
    SensorConfig("max_messages",
        "10",
        "ดึง Message สูงสุด 10 ต่อ Poll",
        "เพิ่มถ้า Message เข้ามาเร็ว ลดถ้า Processing หนัก"),
    SensorConfig("run_key",
        "message_id",
        "ป้องกัน Duplicate Run จาก Message เดียวกัน",
        "ใช้ message_id ที่ Unique"),
    SensorConfig("ack_after_process",
        "True",
        "Acknowledge หลังสร้าง RunRequest",
        "ถ้า Run Fail Message ไม่ถูก Re-process ใช้ DLQ แทน"),
    SensorConfig("dead_letter_topic",
        "events-dlq",
        "Message ที่ Fail ส่งไป Dead Letter Queue",
        "ตั้ง Max Delivery Attempts 5 ก่อนส่ง DLQ"),
]

print("=== Sensor Configuration ===")
for c in configs:
    print(f"  [{c.setting}] = {c.value}")
    print(f"    Purpose: {c.purpose}")
    print(f"    Tip: {c.production_tip}")

Asset Pipeline

# === Dagster Asset Pipeline ===

# from dagster import asset, AssetIn, Output, MetadataValue
# from dagster import FreshnessPolicy, AutoMaterializePolicy
#
# @asset(
#     group_name="events",
#     io_manager_key="bigquery_io_manager",
#     freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
# )
# def raw_events(context) -> pd.DataFrame:
#     """Ingest raw events from Pub/Sub"""
#     events = fetch_events_from_staging()
#     context.log.info(f"Ingested {len(events)} events")
#     context.add_output_metadata({
#         "row_count": MetadataValue.int(len(events)),
#         "schema": MetadataValue.json(events.dtypes.to_dict()),
#     })
#     return events
#
# @asset(
#     ins={"raw_events": AssetIn()},
#     group_name="events",
# )
# def cleaned_events(context, raw_events: pd.DataFrame) -> pd.DataFrame:
#     """Clean and validate events"""
#     cleaned = raw_events.dropna(subset=["user_id", "event_type"])
#     cleaned = cleaned[cleaned["timestamp"] > "2024-01-01"]
#     context.log.info(f"Cleaned: {len(raw_events)} → {len(cleaned)}")
#     return cleaned
#
# @asset(
#     ins={"cleaned_events": AssetIn()},
#     group_name="analytics",
# )
# def user_activity_summary(context, cleaned_events: pd.DataFrame) -> pd.DataFrame:
#     """Aggregate user activity"""
#     summary = cleaned_events.groupby("user_id").agg(
#         event_count=("event_type", "count"),
#         last_active=("timestamp", "max"),
#     ).reset_index()
#     return summary

@dataclass
class AssetDefinition:
    name: str
    group: str
    depends_on: str
    io_manager: str
    freshness: str
    description: str

assets = [
    AssetDefinition("raw_events",
        "events", "Pub/Sub Sensor",
        "bigquery_io_manager",
        "60 minutes",
        "Raw events จาก Pub/Sub ingest ไป BigQuery"),
    AssetDefinition("cleaned_events",
        "events", "raw_events",
        "bigquery_io_manager",
        "90 minutes",
        "ทำความสะอาดข้อมูล ลบ Null Validate Schema"),
    AssetDefinition("user_activity_summary",
        "analytics", "cleaned_events",
        "bigquery_io_manager",
        "120 minutes",
        "สรุป Activity ต่อ User สำหรับ Dashboard"),
    AssetDefinition("daily_report",
        "reporting", "user_activity_summary",
        "gcs_io_manager",
        "24 hours",
        "รายงานประจำวัน Export เป็น CSV ไป GCS"),
]

print("=== Asset Pipeline ===")
for a in assets:
    print(f"\n  [{a.name}] Group: {a.group}")
    print(f"    Depends: {a.depends_on} | IO: {a.io_manager}")
    print(f"    Freshness: {a.freshness}")
    print(f"    Desc: {a.description}")

Monitoring & Alerting

# === Dagster Monitoring ===

@dataclass
class DagsterAlert:
    alert: str
    condition: str
    channel: str
    action: str

alerts = [
    DagsterAlert("Pipeline Run Failed",
        "Run status = FAILURE",
        "Slack #data-alerts + PagerDuty",
        "Check logs ดู Error Fix แล้ว Re-run"),
    DagsterAlert("Asset Stale",
        "Asset ไม่ Materialize ภายใน Freshness Policy",
        "Slack #data-alerts",
        "ตรวจ Sensor ทำงานไหม Pub/Sub มี Message ไหม"),
    DagsterAlert("Sensor Not Ticking",
        "Sensor ไม่ Tick ใน 5 นาที",
        "Slack #infra-alerts + PagerDuty",
        "ตรวจ Dagster Daemon Process Restart"),
    DagsterAlert("Run Duration Anomaly",
        "Run Duration > 2x Average",
        "Slack #data-alerts",
        "ตรวจ Data Volume Query Performance"),
    DagsterAlert("Pub/Sub Message Backlog",
        "Unacked Messages > 1000",
        "Slack + GCP Monitoring",
        "Scale Dagster Workers เพิ่ม Parallelism"),
    DagsterAlert("Data Quality Check Failed",
        "Asset Check ไม่ผ่าน (Null Rate > 5%)",
        "Slack #data-quality",
        "ตรวจ Source Data Fix Upstream"),
]

print("=== Alerts ===")
for a in alerts:
    print(f"  [{a.alert}]")
    print(f"    Condition: {a.condition}")
    print(f"    Channel: {a.channel}")
    print(f"    Action: {a.action}")

เคล็ดลับ

  • Sensor: ใช้ run_key ป้องกัน Duplicate Run จาก Message เดียวกัน
  • DLQ: ตั้ง Dead Letter Queue สำหรับ Message ที่ Fail
  • Freshness: ตั้ง FreshnessPolicy ทุก Asset ตรวจ Stale อัตโนมัติ
  • Partition: ใช้ Daily Partition สำหรับ Time Series Data
  • IO Manager: ใช้ BigQuery IO Manager สำหรับ GCP Ecosystem

Dagster คืออะไร

Data Orchestrator Python Software-defined Assets Dagit UI Sensor Schedule Partition IO Manager Type System GCP AWS Integration Open Source