SiamCafe · Blog
Dagster Pipeline Performance Tuning
บทความ

Dagster Pipeline Performance Tuning

เผยแพร่ 28 พฤษภาคม 2569

Dagster Performance Tuning

Dagster Data Pipeline Performance Tuning Asset Optimization Partitioning Concurrency Caching IO Manager Profiling Monitoring Python Orchestrator

OptimizationImpactComplexityเมื่อไหร่ใช้
Partitioningสูงมากต่ำข้อมูลมาก Incremental
Concurrencyสูงต่ำIndependent Assets
IO ManagerสูงปานกลางI/O Bottleneck
Cachingปานกลางต่ำRepeated Computation
Resource PoolปานกลางปานกลางDB Connection Limit

Asset Optimization

=== Dagster Asset Performance ===

pip install dagster dagster-webserver dagster-postgres

Before: Slow — Process all data every run

@asset

def daily_sales(context):

df = pd.read_sql("SELECT * FROM orders", engine) # ALL data

result = df.groupby('date').agg({'amount': 'sum'})

result.to_sql('daily_sales', engine, if_exists='replace')

return result

After: Fast — Partitioned by date

from dagster import asset, DailyPartitionsDefinition

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partitions)

def daily_sales(context):

partition_date = context.partition_key

df = pd.read_sql(

"SELECT * FROM orders WHERE date = %s",

engine, params=[partition_date]

) # Only 1 day

result = df.groupby('date').agg({'amount': 'sum'})

return result

Concurrency — Run independent assets in parallel

from dagster import define_asset_job, Definitions

@asset

def raw_orders(): ...

@asset

def raw_products(): ...

@asset(deps=[raw_orders, raw_products])

def enriched_orders(): ...

# raw_orders and raw_products run in PARALLEL

# enriched_orders waits for both

from dataclasses import dataclass

@dataclass

class AssetPerformance:

asset: str

before_sec: float

after_sec: float

improvement: str

technique: str

assets = [

AssetPerformance("daily_sales", 300, 15, "20x faster", "Partitioning"),

AssetPerformance("user_features", 180, 45, "4x faster", "Caching"),

AssetPerformance("product_catalog", 120, 30, "4x faster", "IO Manager"),

AssetPerformance("enriched_orders", 240, 60, "4x faster", "Concurrency"),

AssetPerformance("ml_predictions", 600, 120, "5x faster", "Partition + Parallel"),

]

print("=== Asset Performance Improvements ===")

for a in assets:

print(f" [{a.asset}] {a.before_sec}s -> {a.after_sec}s ({a.improvement})")

print(f" Technique: {a.technique}")

IO Manager และ Caching

=== IO Manager & Caching ===

Custom IO Manager — Parquet instead of CSV

from dagster import IOManager, io_manager

import pyarrow.parquet as pq

class ParquetIOManager(IOManager):

def __init__(self, base_path):

self.base_path = base_path

def handle_output(self, context, obj):

path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"

obj.to_parquet(path, compression='snappy')

context.log.info(f"Wrote {len(obj)} rows to {path}")

def load_input(self, context):

path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"

return pd.read_parquet(path)

@io_manager

def parquet_io_manager():

return ParquetIOManager("/data/assets")

Freshness Policy — Skip if not stale

@asset(

freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),

auto_materialize_policy=AutoMaterializePolicy.eager(),

)

def hourly_metrics(context):

# Only runs if data is stale (>60 min old)

...

Resource Pool — Limit DB connections

from dagster import resource

@resource(config_schema={"pool_size": int})

def db_pool(context):

return create_engine(

DB_URL,

pool_size=context.resource_config["pool_size"],

max_overflow=5,

)

io_comparison = {

"CSV IO Manager": {"write_sec": 45, "read_sec": 30, "size_mb": 500},

"Parquet IO Manager": {"write_sec": 8, "read_sec": 3, "size_mb": 120},

"Delta Lake IO Manager": {"write_sec": 12, "read_sec": 5, "size_mb": 100},

"Snowflake IO Manager": {"write_sec": 20, "read_sec": 15, "size_mb": 0},

}

print("\nIO Manager Comparison (1M rows):")

for name, perf in io_comparison.items():

print(f" [{name}]")

print(f" Write: {perf['write_sec']}s | Read: {perf['read_sec']}s | Size: {perf['size_mb']}MB")

Monitoring และ Profiling

=== Monitoring & Profiling ===

Dagster UI — Built-in Monitoring

dagster dev # Start UI at localhost:3000

  • Asset Lineage Graph
  • Run Timeline
  • Asset Materialization History
  • Partition Status Matrix

Custom Logging

@asset

def heavy_computation(context):

import time

start = time.time()

context.log.info("Starting data load...")

df = load_data()

context.log.info(f"Loaded {len(df)} rows in {time.time()-start:.1f}s")

start2 = time.time()

result = transform(df)

context.log.info(f"Transformed in {time.time()-start2:.1f}s")

context.add_output_metadata({

"row_count": len(result),

"duration_sec": time.time() - start,

"memory_mb": result.memory_usage(deep=True).sum() / 1024**2,

})

return result

Prometheus Metrics

from prometheus_client import Counter, Histogram

ASSET_DURATION = Histogram('dagster_asset_duration_seconds',

'Asset materialization duration',

['asset_name'])

ASSET_ROWS = Counter('dagster_asset_rows_total',

'Total rows processed', ['asset_name'])

pipeline_metrics = {

"Total Pipeline Duration": "12 min (was 45 min)",

"Longest Asset": "ml_predictions: 2 min",

"Parallelism": "4 assets concurrent",

"Partitions/Day": "365 (daily for 1 year)",

"Cache Hit Rate": "78% (skipped stale checks)",

"Memory Peak": "2.1 GB (was 8.5 GB)",

"Failed Runs (7d)": "2 (both auto-retried)",

}

print("Pipeline Metrics:")

for k, v in pipeline_metrics.items():

print(f" {k}: {v}")

เคล็ดลับ

  • Partition: แบ่ง Partition ตามวันหรือ Key ลด Data ที่ Process
  • Parquet: ใช้ Parquet แทน CSV เร็วกว่า 5-10x เล็กกว่า 4x
  • Freshness: ใช้ FreshnessPolicy ไม่ทำงานซ้ำถ้า Data ไม่เปลี่ยน
  • Pool: จำกัด Connection Pool ไม่ให้ DB Overload
  • Metadata: Log Duration และ Row Count ทุก Asset

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

Dagster คืออะไร

Open Source Data Orchestrator Python Software-defined Assets Type System UI Partitioning Scheduling Sensor IO Manager dbt Spark

Performance Tuning ทำอย่างไร

Profiling หา Bottleneck Partitioning Concurrency IO Manager Caching Resource Pool Lazy Loading ลด Memory

Partitioning ช่วยเรื่อง Performance อย่างไร

แบ่งงานเล็ก Process เฉพาะที่เปลี่ยน เร็วขึ้น Retry เฉพาะ Fail Backfill Memory น้อย Parallelism หลาย Partition พร้อมกัน

Dagster กับ Airflow ต่างกันอย่างไร

Dagster Assets Data Type System UI Testing Airflow DAG Tasks Workflow Community Plugin ทั้งสอง Python Modern vs General

สรุป

Dagster Pipeline Performance Tuning Asset Partitioning Concurrency IO Manager Parquet Caching FreshnessPolicy Resource Pool Monitoring Profiling Metadata Python Data Orchestrator