Technology

Dagster Pipeline SaaS Architecture

dagster pipeline saas architecture
Dagster Pipeline SaaS Architecture | SiamCafe Blog
2025-08-20· อ. บอม — SiamCafe.net· 11,091 คำ

Dagster SaaS Architecture

Dagster SaaS Architecture Multi-tenant Pipeline Event-driven Scaling Billing Kubernetes Software-defined Assets IO Manager

ComponentRoleSaaS FeatureTool
Partitionแยก Data ตาม TenantMulti-tenant IsolationStaticPartitionsDefinition
Sensorตรวจจับ EventEvent-driven TriggerSQS/Kafka Sensor
IO Managerแยก Storage PathData IsolationS3/GCS IO Manager
Run Launcherกระจาย RunScalable ExecutionK8sRunLauncher
Scheduleตั้ง Cron JobSelf-service ConfigScheduleDefinition
TagsMetadata ต่อ RunBilling TrackingRun Tags

Architecture Design

# === SaaS Pipeline Architecture ===

from dataclasses import dataclass, field

@dataclass
class TenantPlan:
    tenant_id: str
    plan: str
    max_concurrent_runs: int
    max_daily_runs: int
    storage_gb: int
    compute_hours: int
    database_url: str
    s3_prefix: str
    schedule: str
    features: list = field(default_factory=list)

tenants = [
    TenantPlan("acme-corp", "enterprise",
        10, 100, 500, 200,
        "postgresql://db-prod/acme",
        "tenants/acme-corp/",
        "0 */2 * * *",
        ["custom_transforms", "real_time", "sla_99_9"]),
    TenantPlan("startup-inc", "pro",
        3, 30, 50, 50,
        "postgresql://db-prod/startup",
        "tenants/startup-inc/",
        "0 6 * * *",
        ["standard_transforms"]),
    TenantPlan("small-biz", "basic",
        1, 10, 10, 10,
        "postgresql://db-prod/smallbiz",
        "tenants/small-biz/",
        "0 8 * * *",
        []),
]

# Dagster Asset with Tenant Partition
# @asset(
#     partitions_def=StaticPartitionsDefinition(
#         [t.tenant_id for t in tenants]
#     ),
#     required_resource_keys={"database", "storage"}
# )
# def ingest_data(context):
#     tenant_id = context.partition_key
#     config = get_tenant_plan(tenant_id)
#     # Read from tenant database
#     # Write to tenant S3 prefix
#     # Tag run with tenant_id for billing

print("=== Tenant Plans ===")
for t in tenants:
    print(f"\n  [{t.tenant_id}] Plan: {t.plan}")
    print(f"    Concurrent: {t.max_concurrent_runs} | Daily: {t.max_daily_runs}")
    print(f"    Storage: {t.storage_gb}GB | Compute: {t.compute_hours}hr")
    print(f"    Schedule: {t.schedule}")
    print(f"    Features: {t.features}")

Event-driven & Scaling

# === Event-driven Pipeline ===

# Sensor for SQS Events
# @sensor(job=ingest_job, minimum_interval_seconds=30)
# def sqs_sensor(context):
#     messages = sqs_client.receive_message(QueueUrl=QUEUE_URL, MaxNumberOfMessages=10)
#     for msg in messages.get("Messages", []):
#         event = json.loads(msg["Body"])
#         tenant_id = event["tenant_id"]
#         yield RunRequest(
#             run_key=f"{tenant_id}-{event['event_id']}",
#             partition_key=tenant_id,
#             tags={"tenant_id": tenant_id, "event_type": event["type"]}
#         )
#         sqs_client.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg["ReceiptHandle"])

@dataclass
class ScalingStrategy:
    strategy: str
    trigger: str
    action: str
    limit: str
    cost_impact: str

strategies = [
    ScalingStrategy("Horizontal Pod Autoscaling",
        "CPU > 70% หรือ Queue Depth > 100",
        "เพิ่ม Worker Pod จาก 3 → 10",
        "Max 20 Pods ต่อ Tenant Tier",
        "ค่า Compute เพิ่มตาม Pod"),
    ScalingStrategy("Concurrency Control",
        "Tenant ส่ง Run เกิน Limit",
        "Queue Run รอคิว ไม่ Reject",
        "Enterprise 10 / Pro 3 / Basic 1",
        "ไม่มี (จำกัดใน Plan)"),
    ScalingStrategy("Priority Queue",
        "หลาย Tenant ส่ง Run พร้อมกัน",
        "Enterprise ทำก่อน Pro ก่อน Basic",
        "Priority: Enterprise > Pro > Basic",
        "Enterprise จ่ายมากกว่า ได้ Priority"),
    ScalingStrategy("Resource Limits per Run",
        "Run ใช้ Resource มากเกินไป",
        "ตั้ง CPU/Memory Limit ต่อ Pod",
        "Enterprise 4CPU/8GB / Pro 2CPU/4GB / Basic 1CPU/2GB",
        "จำกัดตาม Plan"),
    ScalingStrategy("Auto-retry with Backoff",
        "Run Fail จาก Transient Error",
        "Retry 3 ครั้ง Exponential Backoff",
        "Max 3 Retries ต่อ Run",
        "Retry ไม่คิด Cost เพิ่ม"),
]

print("=== Scaling Strategies ===")
for s in strategies:
    print(f"\n  [{s.strategy}]")
    print(f"    Trigger: {s.trigger}")
    print(f"    Action: {s.action}")
    print(f"    Limit: {s.limit}")
    print(f"    Cost: {s.cost_impact}")

Billing & Monitoring

# === Billing Integration ===

@dataclass
class BillingMetric:
    metric: str
    unit: str
    price_basic: str
    price_pro: str
    price_enterprise: str
    tracking: str

metrics = [
    BillingMetric("Pipeline Runs",
        "Per Run",
        "$0.10/run (10 runs/day)",
        "$0.05/run (30 runs/day)",
        "$0.02/run (100 runs/day)",
        "Dagster Run Count per Tenant Tag"),
    BillingMetric("Compute Hours",
        "Per Hour",
        "$0.50/hr (10 hr/mo)",
        "$0.30/hr (50 hr/mo)",
        "$0.15/hr (200 hr/mo)",
        "Pod Duration × CPU from K8s Metrics"),
    BillingMetric("Data Volume",
        "Per GB Processed",
        "$0.10/GB (10 GB/mo)",
        "$0.05/GB (50 GB/mo)",
        "$0.02/GB (500 GB/mo)",
        "IO Manager Bytes Written/Read"),
    BillingMetric("Storage",
        "Per GB Stored",
        "$0.03/GB/mo",
        "$0.02/GB/mo",
        "$0.01/GB/mo",
        "S3 Bucket Size per Tenant Prefix"),
]

print("=== Billing Metrics ===")
for m in metrics:
    print(f"\n  [{m.metric}] Unit: {m.unit}")
    print(f"    Basic: {m.price_basic}")
    print(f"    Pro: {m.price_pro}")
    print(f"    Enterprise: {m.price_enterprise}")
    print(f"    Tracking: {m.tracking}")

เคล็ดลับ

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

Dagster SaaS Architecture คืออะไร

Data Pipeline Multi-tenant Event-driven Scalable Billing Self-service Software-defined Assets IO Manager Partition Sensor Kubernetes

ออกแบบอย่างไร

Partition Tenant ID Resource Config IO Manager S3 Path Sensor SQS Kafka Schedule Config Tag Billing Code Location Custom Logic

Scale อย่างไร

HPA K8sRunLauncher Celery Queue Auto-scaling Concurrency Priority Resource Limits Retry Backoff Connection Pool Cache

Billing Integration ทำอย่างไร

Run Tags Usage Metrics Stripe Chargebee Per Run Per GB Per Compute Hour Dashboard Quota Limit Alert Invoice อัตโนมัติ

สรุป

Dagster SaaS Pipeline Multi-tenant Partition Event-driven Sensor Scaling K8s Billing Stripe IO Manager Monitoring SLA Production

📖 บทความที่เกี่ยวข้อง

Radix UI Primitives SaaS Architectureอ่านบทความ → Dagster Pipeline Docker Container Deployอ่านบทความ → Dagster Pipeline Progressive Deliveryอ่านบทความ → Dagster Pipeline Career Development ITอ่านบทความ → Dagster Pipeline Internal Developer Platformอ่านบทความ →

📚 ดูบทความทั้งหมด →