Dagster Multi-tenant
Dagster Data Pipeline Multi-tenant Partition Resource Configuration IO Manager Software-defined Assets Orchestrator
| Approach | Isolation Level | Complexity | Cost | Best For |
|---|---|---|---|---|
| Partition per Tenant | Data Level | ต่ำ | ต่ำ | Tenant จำนวันนี้อย-ปานกลาง |
| Resource Config per Tenant | Connection Level | ปานกลาง | ต่ำ | แยก Database ต่าง Tenant |
| Code Location per Tenant | Code Level | สูง | ปานกลาง | Tenant ต้องการ Custom Logic |
| Branch Deployment | Environment Level | สูง | สูง | Tenant ต้องการ Full Isolation |
Architecture Design
# === Multi-tenant Dagster Pipeline ===
from dataclasses import dataclass, field
# Tenant Configuration
@dataclass
class TenantConfig:
tenant_id: str
database_url: str
s3_bucket: str
s3_prefix: str
schema_name: str
schedule: str
enabled: bool = True
TENANTS = [
TenantConfig("tenant_a",
"postgresql://db1.example.com/tenant_a",
"data-lake-prod", "tenant_a/",
"tenant_a", "0 6 * * *", True),
TenantConfig("tenant_b",
"postgresql://db2.example.com/tenant_b",
"data-lake-prod", "tenant_b/",
"tenant_b", "0 7 * * *", True),
TenantConfig("tenant_c",
"postgresql://db1.example.com/tenant_c",
"data-lake-prod", "tenant_c/",
"tenant_c", "0 8 * * *", True),
]
# Dagster Partition Definition
# from dagster import StaticPartitionsDefinition
#
# tenant_partitions = StaticPartitionsDefinition(
# [t.tenant_id for t in TENANTS if t.enabled]
# )
# Dagster Asset with Partition
# @asset(partitions_def=tenant_partitions)
# def raw_orders(context):
# tenant_id = context.partition_key
# config = get_tenant_config(tenant_id)
# df = read_from_database(config.database_url, "orders")
# write_to_s3(df, f"s3://{config.s3_bucket}/{config.s3_prefix}raw/orders/")
# context.log.info(f"Loaded {len(df)} orders for {tenant_id}")
# return df
def get_tenant_config(tenant_id: str) -> TenantConfig:
for t in TENANTS:
if t.tenant_id == tenant_id:
return t
raise ValueError(f"Unknown tenant: {tenant_id}")
print("=== Tenant Configs ===")
for t in TENANTS:
print(f" [{t.tenant_id}] DB: {t.database_url}")
print(f" S3: s3://{t.s3_bucket}/{t.s3_prefix}")
print(f" Schedule: {t.schedule} | Enabled: {t.enabled}")
Resource & IO Manager
# === Tenant-aware Resource Configuration ===
# from dagster import resource, IOManager, io_manager, InputContext, OutputContext
# @resource(config_schema={"tenant_id": str})
# def tenant_database(context):
# tenant_id = context.resource_config["tenant_id"]
# config = get_tenant_config(tenant_id)
# return create_engine(config.database_url)
# class TenantS3IOManager(IOManager):
# def __init__(self, bucket: str):
# self.bucket = bucket
#
# def _get_path(self, context) -> str:
# tenant_id = context.partition_key
# asset_key = "/".join(context.asset_key.path)
# return f"s3://{self.bucket}/{tenant_id}/{asset_key}.parquet"
#
# def handle_output(self, context: OutputContext, obj):
# path = self._get_path(context)
# obj.to_parquet(path)
# context.log.info(f"Wrote to {path}")
#
# def load_input(self, context: InputContext):
# path = self._get_path(context)
# return pd.read_parquet(path)
# @io_manager(config_schema={"bucket": str})
# def tenant_s3_io_manager(context):
# return TenantS3IOManager(context.resource_config["bucket"])
@dataclass
class IOManagerDesign:
name: str
storage: str
path_pattern: str
isolation: str
use_case: str
io_managers = [
IOManagerDesign("S3 IO Manager",
"AWS S3 / GCS",
"s3://bucket/{tenant_id}/{asset_key}.parquet",
"Path-based: แต่ละ Tenant มี Prefix แยก",
"Data Lake, Large Dataset"),
IOManagerDesign("Database IO Manager",
"PostgreSQL / MySQL / Snowflake",
"{schema_name}.{table_name}",
"Schema-based: แต่ละ Tenant มี Schema แยก",
"Warehouse, Structured Data"),
IOManagerDesign("Local File IO Manager",
"Local Filesystem",
"/data/{tenant_id}/{asset_key}/",
"Directory-based",
"Development, Testing"),
]
print("=== IO Manager Designs ===")
for io in io_managers:
print(f"\n [{io.name}] Storage: {io.storage}")
print(f" Path: {io.path_pattern}")
print(f" Isolation: {io.isolation}")
print(f" Use: {io.use_case}")
Monitoring & Operations
# === Multi-tenant Monitoring ===
@dataclass
class MonitoringRule:
rule: str
metric: str
threshold: str
action: str
scope: str
rules = [
MonitoringRule("Pipeline Failure",
"Run Status = FAILURE",
"1 Failure per Tenant",
"Alert Slack + PagerDuty + Auto-retry 1x",
"Per Tenant (ไม่กระทบ Tenant อื่น)"),
MonitoringRule("SLA Breach",
"Pipeline Duration > Expected",
"Tenant A: 2hr, Tenant B: 1hr, Tenant C: 30min",
"Alert Team + Investigate",
"Per Tenant (SLA ต่างกัน)"),
MonitoringRule("Data Quality",
"Row Count, Null Percentage, Schema Match",
"Row Count < 80% ของวันก่อน = Alert",
"Alert + Hold Pipeline + Manual Review",
"Per Tenant Per Asset"),
MonitoringRule("Resource Usage",
"CPU Memory Duration Cost",
"ตาม Budget ของแต่ละ Tenant",
"Alert + Throttle ถ้าเกิน Budget",
"Per Tenant (Cost Tracking)"),
MonitoringRule("Data Freshness",
"Last Materialization Time",
"Data เก่ากว่า 24hr = Alert",
"Re-trigger Pipeline + Alert",
"Per Tenant Per Asset"),
]
print("=== Monitoring Rules ===")
for r in rules:
print(f" [{r.rule}] Metric: {r.metric}")
print(f" Threshold: {r.threshold}")
print(f" Action: {r.action}")
print(f" Scope: {r.scope}")
เคล็ดลับ
- Partition: ใช้ StaticPartitionsDefinition สำหรับ Tenant ID
- IO Manager: แยก Storage Path ตาม Tenant เสมอ
- Isolation: Data แต่ละ Tenant ต้องแยกกัน 100% ป้องกัน Data Leak
- Testing: ทดสอบ Pipeline กับ Mock Tenant ก่อน Production
- Monitoring: Alert แยกตาม Tenant ถ้า A พัง B ต้องไม่กระทบ
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
เปรียบเทียบข้อดีและข้อเสีย
จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม
Dagster คืออะไร
Data Orchestrator Software-defined Assets Type System Dagit UI Sensor Schedule Partition IO Manager dbt Spark Snowflake Testing Framework
Multi-tenant Design คืออะไร
หลาย Tenant Infrastructure เดียวกัน แยก Data Config Access Partition Resource Code Location Branch Deployment ลด Cost
ออกแบบอย่างไร
Partition Key Tenant ID Resource Config Database IO Manager S3 Path Tag Sensor Trigger Environment Secret Manager Schema
Best Practices คืออะไร
Data Isolation Access Control Testing Monitoring Alert Retry Scaling Kubernetes Documentation Cost Tracking Billing SLA
สรุป
Dagster Multi-tenant Pipeline Partition Resource IO Manager Data Isolation Monitoring SLA Cost Tracking Software-defined Assets Orchestrator
