Technology

Dagster Pipeline Multi-tenant Design

dagster pipeline multi tenant design
Dagster Pipeline Multi-tenant Design | SiamCafe Blog
2025-08-01· อ. บอม — SiamCafe.net· 11,513 คำ

Dagster Multi-tenant

Dagster Data Pipeline Multi-tenant Partition Resource Configuration IO Manager Software-defined Assets Orchestrator

ApproachIsolation LevelComplexityCostBest For
Partition per TenantData Levelต่ำต่ำTenant จำนวันนี้อย-ปานกลาง
Resource Config per TenantConnection Levelปานกลางต่ำแยก Database ต่าง Tenant
Code Location per TenantCode LevelสูงปานกลางTenant ต้องการ Custom Logic
Branch DeploymentEnvironment LevelสูงสูงTenant ต้องการ Full Isolation

Architecture Design

# === Multi-tenant Dagster Pipeline ===

from dataclasses import dataclass, field

# Tenant Configuration
@dataclass
class TenantConfig:
    tenant_id: str
    database_url: str
    s3_bucket: str
    s3_prefix: str
    schema_name: str
    schedule: str
    enabled: bool = True

TENANTS = [
    TenantConfig("tenant_a",
        "postgresql://db1.example.com/tenant_a",
        "data-lake-prod", "tenant_a/",
        "tenant_a", "0 6 * * *", True),
    TenantConfig("tenant_b",
        "postgresql://db2.example.com/tenant_b",
        "data-lake-prod", "tenant_b/",
        "tenant_b", "0 7 * * *", True),
    TenantConfig("tenant_c",
        "postgresql://db1.example.com/tenant_c",
        "data-lake-prod", "tenant_c/",
        "tenant_c", "0 8 * * *", True),
]

# Dagster Partition Definition
# from dagster import StaticPartitionsDefinition
#
# tenant_partitions = StaticPartitionsDefinition(
#     [t.tenant_id for t in TENANTS if t.enabled]
# )

# Dagster Asset with Partition
# @asset(partitions_def=tenant_partitions)
# def raw_orders(context):
#     tenant_id = context.partition_key
#     config = get_tenant_config(tenant_id)
#     df = read_from_database(config.database_url, "orders")
#     write_to_s3(df, f"s3://{config.s3_bucket}/{config.s3_prefix}raw/orders/")
#     context.log.info(f"Loaded {len(df)} orders for {tenant_id}")
#     return df

def get_tenant_config(tenant_id: str) -> TenantConfig:
    for t in TENANTS:
        if t.tenant_id == tenant_id:
            return t
    raise ValueError(f"Unknown tenant: {tenant_id}")

print("=== Tenant Configs ===")
for t in TENANTS:
    print(f"  [{t.tenant_id}] DB: {t.database_url}")
    print(f"    S3: s3://{t.s3_bucket}/{t.s3_prefix}")
    print(f"    Schedule: {t.schedule} | Enabled: {t.enabled}")

Resource & IO Manager

# === Tenant-aware Resource Configuration ===

# from dagster import resource, IOManager, io_manager, InputContext, OutputContext

# @resource(config_schema={"tenant_id": str})
# def tenant_database(context):
#     tenant_id = context.resource_config["tenant_id"]
#     config = get_tenant_config(tenant_id)
#     return create_engine(config.database_url)

# class TenantS3IOManager(IOManager):
#     def __init__(self, bucket: str):
#         self.bucket = bucket
#
#     def _get_path(self, context) -> str:
#         tenant_id = context.partition_key
#         asset_key = "/".join(context.asset_key.path)
#         return f"s3://{self.bucket}/{tenant_id}/{asset_key}.parquet"
#
#     def handle_output(self, context: OutputContext, obj):
#         path = self._get_path(context)
#         obj.to_parquet(path)
#         context.log.info(f"Wrote to {path}")
#
#     def load_input(self, context: InputContext):
#         path = self._get_path(context)
#         return pd.read_parquet(path)

# @io_manager(config_schema={"bucket": str})
# def tenant_s3_io_manager(context):
#     return TenantS3IOManager(context.resource_config["bucket"])

@dataclass
class IOManagerDesign:
    name: str
    storage: str
    path_pattern: str
    isolation: str
    use_case: str

io_managers = [
    IOManagerDesign("S3 IO Manager",
        "AWS S3 / GCS",
        "s3://bucket/{tenant_id}/{asset_key}.parquet",
        "Path-based: แต่ละ Tenant มี Prefix แยก",
        "Data Lake, Large Dataset"),
    IOManagerDesign("Database IO Manager",
        "PostgreSQL / MySQL / Snowflake",
        "{schema_name}.{table_name}",
        "Schema-based: แต่ละ Tenant มี Schema แยก",
        "Warehouse, Structured Data"),
    IOManagerDesign("Local File IO Manager",
        "Local Filesystem",
        "/data/{tenant_id}/{asset_key}/",
        "Directory-based",
        "Development, Testing"),
]

print("=== IO Manager Designs ===")
for io in io_managers:
    print(f"\n  [{io.name}] Storage: {io.storage}")
    print(f"    Path: {io.path_pattern}")
    print(f"    Isolation: {io.isolation}")
    print(f"    Use: {io.use_case}")

Monitoring & Operations

# === Multi-tenant Monitoring ===

@dataclass
class MonitoringRule:
    rule: str
    metric: str
    threshold: str
    action: str
    scope: str

rules = [
    MonitoringRule("Pipeline Failure",
        "Run Status = FAILURE",
        "1 Failure per Tenant",
        "Alert Slack + PagerDuty + Auto-retry 1x",
        "Per Tenant (ไม่กระทบ Tenant อื่น)"),
    MonitoringRule("SLA Breach",
        "Pipeline Duration > Expected",
        "Tenant A: 2hr, Tenant B: 1hr, Tenant C: 30min",
        "Alert Team + Investigate",
        "Per Tenant (SLA ต่างกัน)"),
    MonitoringRule("Data Quality",
        "Row Count, Null Percentage, Schema Match",
        "Row Count < 80% ของวันก่อน = Alert",
        "Alert + Hold Pipeline + Manual Review",
        "Per Tenant Per Asset"),
    MonitoringRule("Resource Usage",
        "CPU Memory Duration Cost",
        "ตาม Budget ของแต่ละ Tenant",
        "Alert + Throttle ถ้าเกิน Budget",
        "Per Tenant (Cost Tracking)"),
    MonitoringRule("Data Freshness",
        "Last Materialization Time",
        "Data เก่ากว่า 24hr = Alert",
        "Re-trigger Pipeline + Alert",
        "Per Tenant Per Asset"),
]

print("=== Monitoring Rules ===")
for r in rules:
    print(f"  [{r.rule}] Metric: {r.metric}")
    print(f"    Threshold: {r.threshold}")
    print(f"    Action: {r.action}")
    print(f"    Scope: {r.scope}")

เคล็ดลับ

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

เปรียบเทียบข้อดีและข้อเสีย

ข้อดีข้อเสีย
ประสิทธิภาพสูง ทำงานได้เร็วและแม่นยำ ลดเวลาทำงานซ้ำซ้อนต้องใช้เวลาเรียนรู้เบื้องต้นพอสมควร มี Learning Curve สูง
มี Community ขนาดใหญ่ มีคนช่วยเหลือและแหล่งเรียนรู้มากมายบางฟีเจอร์อาจยังไม่เสถียร หรือมีการเปลี่ยนแปลงบ่อยในเวอร์ชันใหม่
รองรับ Integration กับเครื่องมือและบริการอื่นได้หลากหลายต้นทุนอาจสูงสำหรับ Enterprise License หรือ Cloud Service
เป็น Open Source หรือมีเวอร์ชันฟรีให้เริ่มต้นใช้งานต้องการ Hardware หรือ Infrastructure ที่เพียงพอ

จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม

Dagster คืออะไร

Data Orchestrator Software-defined Assets Type System Dagit UI Sensor Schedule Partition IO Manager dbt Spark Snowflake Testing Framework

Multi-tenant Design คืออะไร

หลาย Tenant Infrastructure เดียวกัน แยก Data Config Access Partition Resource Code Location Branch Deployment ลด Cost

ออกแบบอย่างไร

Partition Key Tenant ID Resource Config Database IO Manager S3 Path Tag Sensor Trigger Environment Secret Manager Schema

Best Practices คืออะไร

Data Isolation Access Control Testing Monitoring Alert Retry Scaling Kubernetes Documentation Cost Tracking Billing SLA

สรุป

Dagster Multi-tenant Pipeline Partition Resource IO Manager Data Isolation Monitoring SLA Cost Tracking Software-defined Assets Orchestrator

📖 บทความที่เกี่ยวข้อง

Dagster Pipeline Career Development ITอ่านบทความ → QuestDB Time Series Multi-tenant Designอ่านบทความ → Python httpx Multi-tenant Designอ่านบทความ → Dagster Pipeline Progressive Deliveryอ่านบทความ → Dagster Pipeline Docker Container Deployอ่านบทความ →

📚 ดูบทความทั้งหมด →