Dagster + Pub/Sub
Dagster Pipeline Pub Sub Architecture Data Orchestrator Asset Pipeline Sensor Schedule GCP Integration Event-driven Monitoring Production
| Component | Role | Technology | Trigger |
|---|---|---|---|
| Pub/Sub Topic | Event Source | Google Cloud Pub/Sub | Publisher sends message |
| Dagster Sensor | Event Listener | dagster-gcp sensor | Poll subscription |
| Asset Pipeline | Data Processing | Dagster @asset | Sensor triggers run |
| IO Manager | Data Storage | BigQuery / GCS / S3 | Asset materialization |
| Dagit UI | Monitoring | Web Dashboard | Always running |
Pub/Sub Sensor Setup
# === Dagster Pub/Sub Sensor ===
# from dagster import sensor, RunRequest, SensorEvaluationContext
# from dagster_gcp import GCSResource
# from google.cloud import pubsub_v1
# import json
#
# @sensor(job=process_events_job, minimum_interval_seconds=30)
# def pubsub_sensor(context: SensorEvaluationContext):
# subscriber = pubsub_v1.SubscriberClient()
# subscription_path = "projects/my-project/subscriptions/events-sub"
#
# response = subscriber.pull(
# request={"subscription": subscription_path, "max_messages": 10}
# )
#
# for msg in response.received_messages:
# data = json.loads(msg.message.data.decode("utf-8"))
# yield RunRequest(
# run_key=msg.message.message_id,
# run_config={
# "ops": {
# "process_event": {
# "config": {
# "event_type": data["type"],
# "payload": json.dumps(data["payload"]),
# "timestamp": data["timestamp"],
# }
# }
# }
# },
# )
# subscriber.acknowledge(
# request={"subscription": subscription_path,
# "ack_ids": [msg.ack_id]}
# )
from dataclasses import dataclass
@dataclass
class SensorConfig:
setting: str
value: str
purpose: str
production_tip: str
configs = [
SensorConfig("minimum_interval_seconds",
"30",
"Poll Pub/Sub ทุก 30 วินาที",
"ลดเป็น 10s ถ้าต้องการ Latency ต่ำ เพิ่มถ้า Cost สูง"),
SensorConfig("max_messages",
"10",
"ดึง Message สูงสุด 10 ต่อ Poll",
"เพิ่มถ้า Message เข้ามาเร็ว ลดถ้า Processing หนัก"),
SensorConfig("run_key",
"message_id",
"ป้องกัน Duplicate Run จาก Message เดียวกัน",
"ใช้ message_id ที่ Unique"),
SensorConfig("ack_after_process",
"True",
"Acknowledge หลังสร้าง RunRequest",
"ถ้า Run Fail Message ไม่ถูก Re-process ใช้ DLQ แทน"),
SensorConfig("dead_letter_topic",
"events-dlq",
"Message ที่ Fail ส่งไป Dead Letter Queue",
"ตั้ง Max Delivery Attempts 5 ก่อนส่ง DLQ"),
]
print("=== Sensor Configuration ===")
for c in configs:
print(f" [{c.setting}] = {c.value}")
print(f" Purpose: {c.purpose}")
print(f" Tip: {c.production_tip}")
Asset Pipeline
# === Dagster Asset Pipeline ===
# from dagster import asset, AssetIn, Output, MetadataValue
# from dagster import FreshnessPolicy, AutoMaterializePolicy
#
# @asset(
# group_name="events",
# io_manager_key="bigquery_io_manager",
# freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
# )
# def raw_events(context) -> pd.DataFrame:
# """Ingest raw events from Pub/Sub"""
# events = fetch_events_from_staging()
# context.log.info(f"Ingested {len(events)} events")
# context.add_output_metadata({
# "row_count": MetadataValue.int(len(events)),
# "schema": MetadataValue.json(events.dtypes.to_dict()),
# })
# return events
#
# @asset(
# ins={"raw_events": AssetIn()},
# group_name="events",
# )
# def cleaned_events(context, raw_events: pd.DataFrame) -> pd.DataFrame:
# """Clean and validate events"""
# cleaned = raw_events.dropna(subset=["user_id", "event_type"])
# cleaned = cleaned[cleaned["timestamp"] > "2024-01-01"]
# context.log.info(f"Cleaned: {len(raw_events)} → {len(cleaned)}")
# return cleaned
#
# @asset(
# ins={"cleaned_events": AssetIn()},
# group_name="analytics",
# )
# def user_activity_summary(context, cleaned_events: pd.DataFrame) -> pd.DataFrame:
# """Aggregate user activity"""
# summary = cleaned_events.groupby("user_id").agg(
# event_count=("event_type", "count"),
# last_active=("timestamp", "max"),
# ).reset_index()
# return summary
@dataclass
class AssetDefinition:
name: str
group: str
depends_on: str
io_manager: str
freshness: str
description: str
assets = [
AssetDefinition("raw_events",
"events", "Pub/Sub Sensor",
"bigquery_io_manager",
"60 minutes",
"Raw events จาก Pub/Sub ingest ไป BigQuery"),
AssetDefinition("cleaned_events",
"events", "raw_events",
"bigquery_io_manager",
"90 minutes",
"ทำความสะอาดข้อมูล ลบ Null Validate Schema"),
AssetDefinition("user_activity_summary",
"analytics", "cleaned_events",
"bigquery_io_manager",
"120 minutes",
"สรุป Activity ต่อ User สำหรับ Dashboard"),
AssetDefinition("daily_report",
"reporting", "user_activity_summary",
"gcs_io_manager",
"24 hours",
"รายงานประจำวัน Export เป็น CSV ไป GCS"),
]
print("=== Asset Pipeline ===")
for a in assets:
print(f"\n [{a.name}] Group: {a.group}")
print(f" Depends: {a.depends_on} | IO: {a.io_manager}")
print(f" Freshness: {a.freshness}")
print(f" Desc: {a.description}")
Monitoring & Alerting
# === Dagster Monitoring ===
@dataclass
class DagsterAlert:
alert: str
condition: str
channel: str
action: str
alerts = [
DagsterAlert("Pipeline Run Failed",
"Run status = FAILURE",
"Slack #data-alerts + PagerDuty",
"Check logs ดู Error Fix แล้ว Re-run"),
DagsterAlert("Asset Stale",
"Asset ไม่ Materialize ภายใน Freshness Policy",
"Slack #data-alerts",
"ตรวจ Sensor ทำงานไหม Pub/Sub มี Message ไหม"),
DagsterAlert("Sensor Not Ticking",
"Sensor ไม่ Tick ใน 5 นาที",
"Slack #infra-alerts + PagerDuty",
"ตรวจ Dagster Daemon Process Restart"),
DagsterAlert("Run Duration Anomaly",
"Run Duration > 2x Average",
"Slack #data-alerts",
"ตรวจ Data Volume Query Performance"),
DagsterAlert("Pub/Sub Message Backlog",
"Unacked Messages > 1000",
"Slack + GCP Monitoring",
"Scale Dagster Workers เพิ่ม Parallelism"),
DagsterAlert("Data Quality Check Failed",
"Asset Check ไม่ผ่าน (Null Rate > 5%)",
"Slack #data-quality",
"ตรวจ Source Data Fix Upstream"),
]
print("=== Alerts ===")
for a in alerts:
print(f" [{a.alert}]")
print(f" Condition: {a.condition}")
print(f" Channel: {a.channel}")
print(f" Action: {a.action}")
เคล็ดลับ
- Sensor: ใช้ run_key ป้องกัน Duplicate Run จาก Message เดียวกัน
- DLQ: ตั้ง Dead Letter Queue สำหรับ Message ที่ Fail
- Freshness: ตั้ง FreshnessPolicy ทุก Asset ตรวจ Stale อัตโนมัติ
- Partition: ใช้ Daily Partition สำหรับ Time Series Data
- IO Manager: ใช้ BigQuery IO Manager สำหรับ GCP Ecosystem
Dagster คืออะไร
Data Orchestrator Python Software-defined Assets Dagit UI Sensor Schedule Partition IO Manager Type System GCP AWS Integration Open Source
Pub/Sub Integration ทำอย่างไร
Sensor Poll Subscription Message Trigger RunRequest run_key Acknowledge DLQ Dead Letter dagster-gcp 30 วินาที max_messages Retry
Asset Pipeline ออกแบบอย่างไร
@asset Decorator Dependencies DAG Materialization IO Manager BigQuery GCS FreshnessPolicy Asset Group Partition Daily Check Quality
Monitor อย่างไร
Dagit UI Run History Alerting Slack PagerDuty Prometheus Metrics Sensor Tick Freshness Stale Data Quality Check Logging ELK Loki
สรุป
Dagster Pipeline Pub Sub Architecture Asset Pipeline Sensor Event-driven GCP BigQuery IO Manager FreshnessPolicy Monitoring Alerting Production
