Dagster Pipeline Performance Tuning
Dagster Performance Tuning
Dagster Data Pipeline Performance Tuning Asset Optimization Partitioning Concurrency Caching IO Manager Profiling Monitoring Python Orchestrator
| Optimization | Impact | Complexity | เมื่อไหร่ใช้ |
|---|---|---|---|
| Partitioning | สูงมาก | ต่ำ | ข้อมูลมาก Incremental |
| Concurrency | สูง | ต่ำ | Independent Assets |
| IO Manager | สูง | ปานกลาง | I/O Bottleneck |
| Caching | ปานกลาง | ต่ำ | Repeated Computation |
| Resource Pool | ปานกลาง | ปานกลาง | DB Connection Limit |
Asset Optimization
=== Dagster Asset Performance ===
pip install dagster dagster-webserver dagster-postgres
Before: Slow — Process all data every run
@asset
def daily_sales(context):
df = pd.read_sql("SELECT * FROM orders", engine) # ALL data
result = df.groupby('date').agg({'amount': 'sum'})
result.to_sql('daily_sales', engine, if_exists='replace')
return result
After: Fast — Partitioned by date
from dagster import asset, DailyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
@asset(partitions_def=daily_partitions)
def daily_sales(context):
partition_date = context.partition_key
df = pd.read_sql(
"SELECT * FROM orders WHERE date = %s",
engine, params=[partition_date]
) # Only 1 day
result = df.groupby('date').agg({'amount': 'sum'})
return result
Concurrency — Run independent assets in parallel
from dagster import define_asset_job, Definitions
@asset
def raw_orders(): ...
@asset
def raw_products(): ...
@asset(deps=[raw_orders, raw_products])
def enriched_orders(): ...
# raw_orders and raw_products run in PARALLEL
# enriched_orders waits for both
from dataclasses import dataclass
@dataclass
class AssetPerformance:
asset: str
before_sec: float
after_sec: float
improvement: str
technique: str
assets = [
AssetPerformance("daily_sales", 300, 15, "20x faster", "Partitioning"),
AssetPerformance("user_features", 180, 45, "4x faster", "Caching"),
AssetPerformance("product_catalog", 120, 30, "4x faster", "IO Manager"),
AssetPerformance("enriched_orders", 240, 60, "4x faster", "Concurrency"),
AssetPerformance("ml_predictions", 600, 120, "5x faster", "Partition + Parallel"),
]
print("=== Asset Performance Improvements ===")
for a in assets:
print(f" [{a.asset}] {a.before_sec}s -> {a.after_sec}s ({a.improvement})")
print(f" Technique: {a.technique}")
IO Manager และ Caching
=== IO Manager & Caching ===
Custom IO Manager — Parquet instead of CSV
from dagster import IOManager, io_manager
import pyarrow.parquet as pq
class ParquetIOManager(IOManager):
def __init__(self, base_path):
self.base_path = base_path
def handle_output(self, context, obj):
path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
obj.to_parquet(path, compression='snappy')
context.log.info(f"Wrote {len(obj)} rows to {path}")
def load_input(self, context):
path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
return pd.read_parquet(path)
@io_manager
def parquet_io_manager():
return ParquetIOManager("/data/assets")
Freshness Policy — Skip if not stale
@asset(
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def hourly_metrics(context):
# Only runs if data is stale (>60 min old)
...
Resource Pool — Limit DB connections
from dagster import resource
@resource(config_schema={"pool_size": int})
def db_pool(context):
return create_engine(
DB_URL,
pool_size=context.resource_config["pool_size"],
max_overflow=5,
)
io_comparison = {
"CSV IO Manager": {"write_sec": 45, "read_sec": 30, "size_mb": 500},
"Parquet IO Manager": {"write_sec": 8, "read_sec": 3, "size_mb": 120},
"Delta Lake IO Manager": {"write_sec": 12, "read_sec": 5, "size_mb": 100},
"Snowflake IO Manager": {"write_sec": 20, "read_sec": 15, "size_mb": 0},
}
print("\nIO Manager Comparison (1M rows):")
for name, perf in io_comparison.items():
print(f" [{name}]")
print(f" Write: {perf['write_sec']}s | Read: {perf['read_sec']}s | Size: {perf['size_mb']}MB")
Monitoring และ Profiling
=== Monitoring & Profiling ===
Dagster UI — Built-in Monitoring
dagster dev # Start UI at localhost:3000
- Asset Lineage Graph
- Run Timeline
- Asset Materialization History
- Partition Status Matrix
Custom Logging
@asset
def heavy_computation(context):
import time
start = time.time()
context.log.info("Starting data load...")
df = load_data()
context.log.info(f"Loaded {len(df)} rows in {time.time()-start:.1f}s")
start2 = time.time()
result = transform(df)
context.log.info(f"Transformed in {time.time()-start2:.1f}s")
context.add_output_metadata({
"row_count": len(result),
"duration_sec": time.time() - start,
"memory_mb": result.memory_usage(deep=True).sum() / 1024**2,
})
return result
Prometheus Metrics
from prometheus_client import Counter, Histogram
ASSET_DURATION = Histogram('dagster_asset_duration_seconds',
'Asset materialization duration',
['asset_name'])
ASSET_ROWS = Counter('dagster_asset_rows_total',
'Total rows processed', ['asset_name'])
pipeline_metrics = {
"Total Pipeline Duration": "12 min (was 45 min)",
"Longest Asset": "ml_predictions: 2 min",
"Parallelism": "4 assets concurrent",
"Partitions/Day": "365 (daily for 1 year)",
"Cache Hit Rate": "78% (skipped stale checks)",
"Memory Peak": "2.1 GB (was 8.5 GB)",
"Failed Runs (7d)": "2 (both auto-retried)",
}
print("Pipeline Metrics:")
for k, v in pipeline_metrics.items():
print(f" {k}: {v}")
เคล็ดลับ
- Partition: แบ่ง Partition ตามวันหรือ Key ลด Data ที่ Process
- Parquet: ใช้ Parquet แทน CSV เร็วกว่า 5-10x เล็กกว่า 4x
- Freshness: ใช้ FreshnessPolicy ไม่ทำงานซ้ำถ้า Data ไม่เปลี่ยน
- Pool: จำกัด Connection Pool ไม่ให้ DB Overload
- Metadata: Log Duration และ Row Count ทุก Asset
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
Dagster คืออะไร
Open Source Data Orchestrator Python Software-defined Assets Type System UI Partitioning Scheduling Sensor IO Manager dbt Spark
Performance Tuning ทำอย่างไร
Profiling หา Bottleneck Partitioning Concurrency IO Manager Caching Resource Pool Lazy Loading ลด Memory
Partitioning ช่วยเรื่อง Performance อย่างไร
แบ่งงานเล็ก Process เฉพาะที่เปลี่ยน เร็วขึ้น Retry เฉพาะ Fail Backfill Memory น้อย Parallelism หลาย Partition พร้อมกัน
Dagster กับ Airflow ต่างกันอย่างไร
Dagster Assets Data Type System UI Testing Airflow DAG Tasks Workflow Community Plugin ทั้งสอง Python Modern vs General
สรุป
Dagster Pipeline Performance Tuning Asset Partitioning Concurrency IO Manager Parquet Caching FreshnessPolicy Resource Pool Monitoring Profiling Metadata Python Data Orchestrator