Technology

Dagster Pipeline Performance Tuning เพิ่มความเร็ว

dagster pipeline performance tuning เพมความเรว
Dagster Pipeline Performance Tuning เพิ่มความเร็ว | SiamCafe Blog
2026-03-10· อ. บอม — SiamCafe.net· 10,374 คำ

Dagster Performance Tuning

Dagster Data Pipeline Performance Tuning Asset Optimization Partitioning Concurrency Caching IO Manager Profiling Monitoring Python Orchestrator

OptimizationImpactComplexityเมื่อไหร่ใช้
Partitioningสูงมากต่ำข้อมูลมาก Incremental
Concurrencyสูงต่ำIndependent Assets
IO ManagerสูงปานกลางI/O Bottleneck
Cachingปานกลางต่ำRepeated Computation
Resource PoolปานกลางปานกลางDB Connection Limit

Asset Optimization

# === Dagster Asset Performance ===

# pip install dagster dagster-webserver dagster-postgres

# Before: Slow — Process all data every run
# @asset
# def daily_sales(context):
#     df = pd.read_sql("SELECT * FROM orders", engine)  # ALL data
#     result = df.groupby('date').agg({'amount': 'sum'})
#     result.to_sql('daily_sales', engine, if_exists='replace')
#     return result

# After: Fast — Partitioned by date
# from dagster import asset, DailyPartitionsDefinition
#
# daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
#
# @asset(partitions_def=daily_partitions)
# def daily_sales(context):
#     partition_date = context.partition_key
#     df = pd.read_sql(
#         "SELECT * FROM orders WHERE date = %s",
#         engine, params=[partition_date]
#     )  # Only 1 day
#     result = df.groupby('date').agg({'amount': 'sum'})
#     return result

# Concurrency — Run independent assets in parallel
# from dagster import define_asset_job, Definitions
#
# @asset
# def raw_orders(): ...
#
# @asset
# def raw_products(): ...
#
# @asset(deps=[raw_orders, raw_products])
# def enriched_orders(): ...
#
# # raw_orders and raw_products run in PARALLEL
# # enriched_orders waits for both

from dataclasses import dataclass

@dataclass
class AssetPerformance:
    asset: str
    before_sec: float
    after_sec: float
    improvement: str
    technique: str

assets = [
    AssetPerformance("daily_sales", 300, 15, "20x faster", "Partitioning"),
    AssetPerformance("user_features", 180, 45, "4x faster", "Caching"),
    AssetPerformance("product_catalog", 120, 30, "4x faster", "IO Manager"),
    AssetPerformance("enriched_orders", 240, 60, "4x faster", "Concurrency"),
    AssetPerformance("ml_predictions", 600, 120, "5x faster", "Partition + Parallel"),
]

print("=== Asset Performance Improvements ===")
for a in assets:
    print(f"  [{a.asset}] {a.before_sec}s -> {a.after_sec}s ({a.improvement})")
    print(f"    Technique: {a.technique}")

IO Manager และ Caching

# === IO Manager & Caching ===

# Custom IO Manager — Parquet instead of CSV
# from dagster import IOManager, io_manager
# import pyarrow.parquet as pq
#
# class ParquetIOManager(IOManager):
#     def __init__(self, base_path):
#         self.base_path = base_path
#
#     def handle_output(self, context, obj):
#         path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
#         obj.to_parquet(path, compression='snappy')
#         context.log.info(f"Wrote {len(obj)} rows to {path}")
#
#     def load_input(self, context):
#         path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
#         return pd.read_parquet(path)
#
# @io_manager
# def parquet_io_manager():
#     return ParquetIOManager("/data/assets")

# Freshness Policy — Skip if not stale
# @asset(
#     freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
#     auto_materialize_policy=AutoMaterializePolicy.eager(),
# )
# def hourly_metrics(context):
#     # Only runs if data is stale (>60 min old)
#     ...

# Resource Pool — Limit DB connections
# from dagster import resource
#
# @resource(config_schema={"pool_size": int})
# def db_pool(context):
#     return create_engine(
#         DB_URL,
#         pool_size=context.resource_config["pool_size"],
#         max_overflow=5,
#     )

io_comparison = {
    "CSV IO Manager": {"write_sec": 45, "read_sec": 30, "size_mb": 500},
    "Parquet IO Manager": {"write_sec": 8, "read_sec": 3, "size_mb": 120},
    "Delta Lake IO Manager": {"write_sec": 12, "read_sec": 5, "size_mb": 100},
    "Snowflake IO Manager": {"write_sec": 20, "read_sec": 15, "size_mb": 0},
}

print("\nIO Manager Comparison (1M rows):")
for name, perf in io_comparison.items():
    print(f"  [{name}]")
    print(f"    Write: {perf['write_sec']}s | Read: {perf['read_sec']}s | Size: {perf['size_mb']}MB")

Monitoring และ Profiling

# === Monitoring & Profiling ===

# Dagster UI — Built-in Monitoring
# dagster dev  # Start UI at localhost:3000
# - Asset Lineage Graph
# - Run Timeline
# - Asset Materialization History
# - Partition Status Matrix

# Custom Logging
# @asset
# def heavy_computation(context):
#     import time
#     start = time.time()
#
#     context.log.info("Starting data load...")
#     df = load_data()
#     context.log.info(f"Loaded {len(df)} rows in {time.time()-start:.1f}s")
#
#     start2 = time.time()
#     result = transform(df)
#     context.log.info(f"Transformed in {time.time()-start2:.1f}s")
#
#     context.add_output_metadata({
#         "row_count": len(result),
#         "duration_sec": time.time() - start,
#         "memory_mb": result.memory_usage(deep=True).sum() / 1024**2,
#     })
#     return result

# Prometheus Metrics
# from prometheus_client import Counter, Histogram
# ASSET_DURATION = Histogram('dagster_asset_duration_seconds',
#                            'Asset materialization duration',
#                            ['asset_name'])
# ASSET_ROWS = Counter('dagster_asset_rows_total',
#                      'Total rows processed', ['asset_name'])

pipeline_metrics = {
    "Total Pipeline Duration": "12 min (was 45 min)",
    "Longest Asset": "ml_predictions: 2 min",
    "Parallelism": "4 assets concurrent",
    "Partitions/Day": "365 (daily for 1 year)",
    "Cache Hit Rate": "78% (skipped stale checks)",
    "Memory Peak": "2.1 GB (was 8.5 GB)",
    "Failed Runs (7d)": "2 (both auto-retried)",
}

print("Pipeline Metrics:")
for k, v in pipeline_metrics.items():
    print(f"  {k}: {v}")

เคล็ดลับ

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

Dagster คืออะไร

Open Source Data Orchestrator Python Software-defined Assets Type System UI Partitioning Scheduling Sensor IO Manager dbt Spark

Performance Tuning ทำอย่างไร

Profiling หา Bottleneck Partitioning Concurrency IO Manager Caching Resource Pool Lazy Loading ลด Memory

Partitioning ช่วยเรื่อง Performance อย่างไร

แบ่งงานเล็ก Process เฉพาะที่เปลี่ยน เร็วขึ้น Retry เฉพาะ Fail Backfill Memory น้อย Parallelism หลาย Partition พร้อมกัน

Dagster กับ Airflow ต่างกันอย่างไร

Dagster Assets Data Type System UI Testing Airflow DAG Tasks Workflow Community Plugin ทั้งสอง Python Modern vs General

สรุป

Dagster Pipeline Performance Tuning Asset Partitioning Concurrency IO Manager Parquet Caching FreshnessPolicy Resource Pool Monitoring Profiling Metadata Python Data Orchestrator

📖 บทความที่เกี่ยวข้อง

Tailwind CSS v4 Performance Tuning เพิ่มความเร็วอ่านบทความ → Qwik Resumability Performance Tuning เพิ่มความเร็วอ่านบทความ → Azure Service Bus Performance Tuning เพิ่มความเร็วอ่านบทความ → Kubernetes Operator Performance Tuning เพิ่มความเร็วอ่านบทความ → WordPress WooCommerce Performance Tuning เพิ่มความเร็วอ่านบทความ →

📚 ดูบทความทั้งหมด →