it
Dagster Pipeline Pub Sub Architecture — สร้าง
Dagster + Pub/Sub

Dagster Pipeline Pub Sub Architecture Data Orchestrator Asset Pipeline Sensor Schedule GCP Integration Event-driven Monitoring Production
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: GCP BigQuery ML Container Orchestration
| Component | Role | Technology | Trigger |
|---|---|---|---|
| Pub/Sub Topic | Event Source | Google Cloud Pub/Sub | Publisher sends message |
| Dagster Sensor | Event Listener | dagster-gcp sensor | Poll subscription |
| Asset Pipeline | Data Processing | Dagster @asset | Sensor triggers run |
| IO Manager | Data Storage | BigQuery / GCS / S3 | Asset materialization |
| Dagit UI | Monitoring | Web Dashboard | Always running |
Pub/Sub Sensor Setup
# === Dagster Pub/Sub Sensor ===
# from dagster import sensor, RunRequest, SensorEvaluationContext
# from dagster_gcp import GCSResource
# from google.cloud import pubsub_v1
# import json
#
# @sensor(job=process_events_job, minimum_interval_seconds=30)
# def pubsub_sensor(context: SensorEvaluationContext):
# subscriber = pubsub_v1.SubscriberClient()
# subscription_path = "projects/my-project/subscriptions/events-sub"
#
# response = subscriber.pull(
# request={"subscription": subscription_path, "max_messages": 10}
# )
#
# for msg in response.received_messages:
# data = json.loads(msg.message.data.decode("utf-8"))
# yield RunRequest(
# run_key=msg.message.message_id,
# run_config={
# "ops": {
# "process_event": {
# "config": {
# "event_type": data["type"],
# "payload": json.dumps(data["payload"]),
# "timestamp": data["timestamp"],
# }
# }
# }
# },
# )
# subscriber.acknowledge(
# request={"subscription": subscription_path,
# "ack_ids": [msg.ack_id]}
# )
from dataclasses import dataclass
@dataclass
class SensorConfig:
setting: str
value: str
purpose: str
production_tip: str
configs = [
SensorConfig("minimum_interval_seconds",
"30",
"Poll Pub/Sub ทุก 30 วินาที",
"ลดเป็น 10s ถ้าต้องการ Latency ต่ำ เพิ่มถ้า Cost สูง"),
SensorConfig("max_messages",
"10",
"ดึง Message สูงสุด 10 ต่อ Poll",
"เพิ่มถ้า Message เข้ามาเร็ว ลดถ้า Processing หนัก"),
SensorConfig("run_key",
"message_id",
"ป้องกัน Duplicate Run จาก Message เดียวกัน",
"ใช้ message_id ที่ Unique"),
SensorConfig("ack_after_process",
"True",
"Acknowledge หลังสร้าง RunRequest",
"ถ้า Run Fail Message ไม่ถูก Re-process ใช้ DLQ แทน"),
SensorConfig("dead_letter_topic",
"events-dlq",
"Message ที่ Fail ส่งไป Dead Letter Queue",
"ตั้ง Max Delivery Attempts 5 ก่อนส่ง DLQ"),
]
print("=== Sensor Configuration ===")
for c in configs:
print(f" [{c.setting}] = {c.value}")
print(f" Purpose: {c.purpose}")
print(f" Tip: {c.production_tip}")
Asset Pipeline

# === Dagster Asset Pipeline ===
# from dagster import asset, AssetIn, Output, MetadataValue
# from dagster import FreshnessPolicy, AutoMaterializePolicy
#
# @asset(
# group_name="events",
# io_manager_key="bigquery_io_manager",
# freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
# )
# def raw_events(context) -> pd.DataFrame:
# """Ingest raw events from Pub/Sub"""
# events = fetch_events_from_staging()
# context.log.info(f"Ingested {len(events)} events")
# context.add_output_metadata({
# "row_count": MetadataValue.int(len(events)),
# "schema": MetadataValue.json(events.dtypes.to_dict()),
# })
# return events
#
# @asset(
# ins={"raw_events": AssetIn()},
# group_name="events",
# )
# def cleaned_events(context, raw_events: pd.DataFrame) -> pd.DataFrame:
# """Clean and validate events"""
# cleaned = raw_events.dropna(subset=["user_id", "event_type"])
# cleaned = cleaned[cleaned["timestamp"] > "2024-01-01"]
# context.log.info(f"Cleaned: {len(raw_events)} → {len(cleaned)}")
# return cleaned
#
# @asset(
# ins={"cleaned_events": AssetIn()},
# group_name="analytics",
# )
# def user_activity_summary(context, cleaned_events: pd.DataFrame) -> pd.DataFrame:
# """Aggregate user activity"""
# summary = cleaned_events.groupby("user_id").agg(
# event_count=("event_type", "count"),
# last_active=("timestamp", "max"),
# ).reset_index()
# return summary
@dataclass
class AssetDefinition:
name: str
group: str
depends_on: str
io_manager: str
freshness: str
description: str
assets = [
AssetDefinition("raw_events",
"events", "Pub/Sub Sensor",
"bigquery_io_manager",
"60 minutes",
"Raw events จาก Pub/Sub ingest ไป BigQuery"),
AssetDefinition("cleaned_events",
"events", "raw_events",
"bigquery_io_manager",
"90 minutes",
"ทำความสะอาดข้อมูล ลบ Null Validate Schema"),
AssetDefinition("user_activity_summary",
"analytics", "cleaned_events",
"bigquery_io_manager",
"120 minutes",
"สรุป Activity ต่อ User สำหรับ Dashboard"),
AssetDefinition("daily_report",
"reporting", "user_activity_summary",
"gcs_io_manager",
"24 hours",
"รายงานประจำวัน Export เป็น CSV ไป GCS"),
]
print("=== Asset Pipeline ===")
for a in assets:
print(f"\n [{a.name}] Group: {a.group}")
print(f" Depends: {a.depends_on} | IO: {a.io_manager}")
print(f" Freshness: {a.freshness}")
print(f" Desc: {a.description}")
Monitoring & Alerting
# === Dagster Monitoring ===
@dataclass
class DagsterAlert:
alert: str
condition: str
channel: str
action: str
alerts = [
DagsterAlert("Pipeline Run Failed",
"Run status = FAILURE",
"Slack #data-alerts + PagerDuty",
"Check logs ดู Error Fix แล้ว Re-run"),
DagsterAlert("Asset Stale",
"Asset ไม่ Materialize ภายใน Freshness Policy",
"Slack #data-alerts",
"ตรวจ Sensor ทำงานไหม Pub/Sub มี Message ไหม"),
DagsterAlert("Sensor Not Ticking",
"Sensor ไม่ Tick ใน 5 นาที",
"Slack #infra-alerts + PagerDuty",
"ตรวจ Dagster Daemon Process Restart"),
DagsterAlert("Run Duration Anomaly",
"Run Duration > 2x Average",
"Slack #data-alerts",
"ตรวจ Data Volume Query Performance"),
DagsterAlert("Pub/Sub Message Backlog",
"Unacked Messages > 1000",
"Slack + GCP Monitoring",
"Scale Dagster Workers เพิ่ม Parallelism"),
DagsterAlert("Data Quality Check Failed",
"Asset Check ไม่ผ่าน (Null Rate > 5%)",
"Slack #data-quality",
"ตรวจ Source Data Fix Upstream"),
]
print("=== Alerts ===")
for a in alerts:
print(f" [{a.alert}]")
print(f" Condition: {a.condition}")
print(f" Channel: {a.channel}")
print(f" Action: {a.action}")
เคล็ดลับ
- Sensor: ใช้ run_key ป้องกัน Duplicate Run จาก Message เดียวกัน
- DLQ: ตั้ง Dead Letter Queue สำหรับ Message ที่ Fail
- Freshness: ตั้ง FreshnessPolicy ทุก Asset ตรวจ Stale อัตโนมัติ
- Partition: ใช้ Daily Partition สำหรับ Time Series Data
- IO Manager: ใช้ BigQuery IO Manager สำหรับ GCP Ecosystem
Dagster คืออะไร
Data Orchestrator Python Software-defined Assets Dagit UI Sensor Schedule Partition IO Manager Type System GCP AWS Integration Open Source
แนะนำเพิ่มเติม — XM Signal
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Nebula Overlay Network DevOps Culture — คู่มือฉบับสมบูรณ์ 2026
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ REST API Design Performance Tuning เพิ่มความเร็ว — คู่มือฉบับสมบูรณ์ 2026





