Dagster Performance Tuning
Dagster Data Pipeline Performance Tuning Asset Optimization Partitioning Concurrency Caching IO Manager Profiling Monitoring Python Orchestrator
| Optimization | Impact | Complexity | เมื่อไหร่ใช้ |
|---|---|---|---|
| Partitioning | สูงมาก | ต่ำ | ข้อมูลมาก Incremental |
| Concurrency | สูง | ต่ำ | Independent Assets |
| IO Manager | สูง | ปานกลาง | I/O Bottleneck |
| Caching | ปานกลาง | ต่ำ | Repeated Computation |
| Resource Pool | ปานกลาง | ปานกลาง | DB Connection Limit |
Asset Optimization
# === Dagster Asset Performance ===
# pip install dagster dagster-webserver dagster-postgres
# Before: Slow — Process all data every run
# @asset
# def daily_sales(context):
# df = pd.read_sql("SELECT * FROM orders", engine) # ALL data
# result = df.groupby('date').agg({'amount': 'sum'})
# result.to_sql('daily_sales', engine, if_exists='replace')
# return result
# After: Fast — Partitioned by date
# from dagster import asset, DailyPartitionsDefinition
#
# daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
#
# @asset(partitions_def=daily_partitions)
# def daily_sales(context):
# partition_date = context.partition_key
# df = pd.read_sql(
# "SELECT * FROM orders WHERE date = %s",
# engine, params=[partition_date]
# ) # Only 1 day
# result = df.groupby('date').agg({'amount': 'sum'})
# return result
# Concurrency — Run independent assets in parallel
# from dagster import define_asset_job, Definitions
#
# @asset
# def raw_orders(): ...
#
# @asset
# def raw_products(): ...
#
# @asset(deps=[raw_orders, raw_products])
# def enriched_orders(): ...
#
# # raw_orders and raw_products run in PARALLEL
# # enriched_orders waits for both
from dataclasses import dataclass
@dataclass
class AssetPerformance:
asset: str
before_sec: float
after_sec: float
improvement: str
technique: str
assets = [
AssetPerformance("daily_sales", 300, 15, "20x faster", "Partitioning"),
AssetPerformance("user_features", 180, 45, "4x faster", "Caching"),
AssetPerformance("product_catalog", 120, 30, "4x faster", "IO Manager"),
AssetPerformance("enriched_orders", 240, 60, "4x faster", "Concurrency"),
AssetPerformance("ml_predictions", 600, 120, "5x faster", "Partition + Parallel"),
]
print("=== Asset Performance Improvements ===")
for a in assets:
print(f" [{a.asset}] {a.before_sec}s -> {a.after_sec}s ({a.improvement})")
print(f" Technique: {a.technique}")
IO Manager และ Caching
# === IO Manager & Caching ===
# Custom IO Manager — Parquet instead of CSV
# from dagster import IOManager, io_manager
# import pyarrow.parquet as pq
#
# class ParquetIOManager(IOManager):
# def __init__(self, base_path):
# self.base_path = base_path
#
# def handle_output(self, context, obj):
# path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
# obj.to_parquet(path, compression='snappy')
# context.log.info(f"Wrote {len(obj)} rows to {path}")
#
# def load_input(self, context):
# path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
# return pd.read_parquet(path)
#
# @io_manager
# def parquet_io_manager():
# return ParquetIOManager("/data/assets")
# Freshness Policy — Skip if not stale
# @asset(
# freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
# auto_materialize_policy=AutoMaterializePolicy.eager(),
# )
# def hourly_metrics(context):
# # Only runs if data is stale (>60 min old)
# ...
# Resource Pool — Limit DB connections
# from dagster import resource
#
# @resource(config_schema={"pool_size": int})
# def db_pool(context):
# return create_engine(
# DB_URL,
# pool_size=context.resource_config["pool_size"],
# max_overflow=5,
# )
io_comparison = {
"CSV IO Manager": {"write_sec": 45, "read_sec": 30, "size_mb": 500},
"Parquet IO Manager": {"write_sec": 8, "read_sec": 3, "size_mb": 120},
"Delta Lake IO Manager": {"write_sec": 12, "read_sec": 5, "size_mb": 100},
"Snowflake IO Manager": {"write_sec": 20, "read_sec": 15, "size_mb": 0},
}
print("\nIO Manager Comparison (1M rows):")
for name, perf in io_comparison.items():
print(f" [{name}]")
print(f" Write: {perf['write_sec']}s | Read: {perf['read_sec']}s | Size: {perf['size_mb']}MB")
Monitoring และ Profiling
# === Monitoring & Profiling ===
# Dagster UI — Built-in Monitoring
# dagster dev # Start UI at localhost:3000
# - Asset Lineage Graph
# - Run Timeline
# - Asset Materialization History
# - Partition Status Matrix
# Custom Logging
# @asset
# def heavy_computation(context):
# import time
# start = time.time()
#
# context.log.info("Starting data load...")
# df = load_data()
# context.log.info(f"Loaded {len(df)} rows in {time.time()-start:.1f}s")
#
# start2 = time.time()
# result = transform(df)
# context.log.info(f"Transformed in {time.time()-start2:.1f}s")
#
# context.add_output_metadata({
# "row_count": len(result),
# "duration_sec": time.time() - start,
# "memory_mb": result.memory_usage(deep=True).sum() / 1024**2,
# })
# return result
# Prometheus Metrics
# from prometheus_client import Counter, Histogram
# ASSET_DURATION = Histogram('dagster_asset_duration_seconds',
# 'Asset materialization duration',
# ['asset_name'])
# ASSET_ROWS = Counter('dagster_asset_rows_total',
# 'Total rows processed', ['asset_name'])
pipeline_metrics = {
"Total Pipeline Duration": "12 min (was 45 min)",
"Longest Asset": "ml_predictions: 2 min",
"Parallelism": "4 assets concurrent",
"Partitions/Day": "365 (daily for 1 year)",
"Cache Hit Rate": "78% (skipped stale checks)",
"Memory Peak": "2.1 GB (was 8.5 GB)",
"Failed Runs (7d)": "2 (both auto-retried)",
}
print("Pipeline Metrics:")
for k, v in pipeline_metrics.items():
print(f" {k}: {v}")
เคล็ดลับ
- Partition: แบ่ง Partition ตามวันหรือ Key ลด Data ที่ Process
- Parquet: ใช้ Parquet แทน CSV เร็วกว่า 5-10x เล็กกว่า 4x
- Freshness: ใช้ FreshnessPolicy ไม่ทำงานซ้ำถ้า Data ไม่เปลี่ยน
- Pool: จำกัด Connection Pool ไม่ให้ DB Overload
- Metadata: Log Duration และ Row Count ทุก Asset
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
Dagster คืออะไร
Open Source Data Orchestrator Python Software-defined Assets Type System UI Partitioning Scheduling Sensor IO Manager dbt Spark
Performance Tuning ทำอย่างไร
Profiling หา Bottleneck Partitioning Concurrency IO Manager Caching Resource Pool Lazy Loading ลด Memory
Partitioning ช่วยเรื่อง Performance อย่างไร
แบ่งงานเล็ก Process เฉพาะที่เปลี่ยน เร็วขึ้น Retry เฉพาะ Fail Backfill Memory น้อย Parallelism หลาย Partition พร้อมกัน
Dagster กับ Airflow ต่างกันอย่างไร
Dagster Assets Data Type System UI Testing Airflow DAG Tasks Workflow Community Plugin ทั้งสอง Python Modern vs General
สรุป
Dagster Pipeline Performance Tuning Asset Partitioning Concurrency IO Manager Parquet Caching FreshnessPolicy Resource Pool Monitoring Profiling Metadata Python Data Orchestrator
