SiamCafe.net Blog
Technology

Prefect Workflow Cost Optimization ลดค่าใช้จ่าย

prefect workflow cost optimization ลดคาใชจาย
Prefect Workflow Cost Optimization ลดค่าใช้จ่าย | SiamCafe Blog
2025-09-30· อ. บอม — SiamCafe.net· 10,376 คำ

Prefect Cost Optimization

Prefect Workflow Cost Optimization ลดค่าใช้จ่าย Caching Right-sizing Scheduling Spot Instances Incremental Processing Serverless Auto-scaling Monitoring Cloud Cost

StrategySavingsEffortRiskเหมาะกับ
Task Caching30-70%ต่ำต่ำทุก Pipeline
Spot Instances60-90%ปานกลางปานกลางFault-tolerant
Incremental50-90%สูงต่ำLarge Datasets
Right-sizing20-50%ต่ำต่ำOver-provisioned
Serverless40-80%ปานกลางต่ำSporadic Workload

Caching Strategy

# === Prefect Caching for Cost Reduction ===

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib

# Task Caching — ไม่คำนวณซ้ำถ้า Input เหมือนเดิม
@task(
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=24),
    retries=3,
    retry_delay_seconds=60,
)
def expensive_api_call(endpoint: str, params: dict) -> dict:
    """API call ที่มีค่าใช้จ่าย — Cache ผลลัพธ์"""
    import httpx
    response = httpx.get(endpoint, params=params, timeout=30)
    return response.json()

# Custom Cache Key — Cache ตาม Date
def daily_cache_key(context, parameters):
    """Cache key ที่เปลี่ยนวันละครั้ง"""
    from datetime import date
    param_hash = hashlib.md5(str(parameters).encode()).hexdigest()[:8]
    return f"{context.task.fn.__name__}-{date.today()}-{param_hash}"

@task(cache_key_fn=daily_cache_key, cache_expiration=timedelta(hours=25))
def daily_aggregation(data: list) -> dict:
    """Aggregate ที่คำนวณวันละครั้ง"""
    total = sum(item.get('amount', 0) for item in data)
    return {"total": total, "count": len(data)}

# Result Storage — เก็บ Cache ใน S3
# from prefect.filesystems import S3
# s3_block = S3(bucket_path="my-bucket/prefect-cache")
# s3_block.save("production-cache")
#
# @task(result_storage=S3.load("production-cache"),
#       cache_key_fn=task_input_hash,
#       cache_expiration=timedelta(hours=12))
# def cached_transform(data):
#     return expensive_transformation(data)

from dataclasses import dataclass

@dataclass
class CacheSaving:
    pipeline: str
    without_cache_min: float
    with_cache_min: float
    api_calls_saved: int
    cost_saved_pct: float

savings = [
    CacheSaving("Daily User ETL", 15.0, 3.0, 500, 65),
    CacheSaving("Hourly Events", 8.0, 2.0, 200, 70),
    CacheSaving("Weekly Reports", 45.0, 10.0, 1000, 75),
    CacheSaving("API Enrichment", 30.0, 5.0, 2000, 80),
]

print("=== Caching Savings ===")
for s in savings:
    print(f"  [{s.pipeline}]")
    print(f"    Without Cache: {s.without_cache_min}min | With: {s.with_cache_min}min")
    print(f"    API Calls Saved: {s.api_calls_saved} | Cost Saved: {s.cost_saved_pct}%")

Infrastructure Optimization

# === Infrastructure Cost Optimization ===

# Work Pool Configuration — Right-sizing
# prefect.yaml
# work_pools:
#   - name: small-tasks
#     type: docker
#     base_job_template:
#       variables:
#         image: python:3.12-slim
#         cpu: 0.5
#         memory: 1024  # 1GB
#     description: "For lightweight ETL tasks"
#
#   - name: heavy-compute
#     type: docker
#     base_job_template:
#       variables:
#         image: python:3.12
#         cpu: 4
#         memory: 16384  # 16GB
#     description: "For ML training and large transforms"
#
#   - name: spot-pool
#     type: kubernetes
#     base_job_template:
#       variables:
#         node_selector:
#           lifecycle: spot
#         tolerations:
#           - key: spot
#             operator: Equal
#             value: "true"
#     description: "Spot instances for fault-tolerant work"

# Scheduling — Off-peak pricing
# deployments:
#   - name: heavy-etl
#     schedule:
#       cron: "0 3 * * *"  # 3 AM — Off-peak
#       timezone: "Asia/Bangkok"
#   - name: light-sync
#     schedule:
#       cron: "*/15 * * * *"  # Every 15 min
#     work_pool: small-tasks

@dataclass
class InfraCost:
    resource: str
    current_config: str
    optimized_config: str
    monthly_before: float
    monthly_after: float
    saving_pct: float

costs = [
    InfraCost("Worker Nodes", "4x m5.xlarge On-demand", "2x m5.large + Spot", 1200, 400, 67),
    InfraCost("Database", "db.r5.2xlarge", "db.r5.large + Read Replica", 800, 450, 44),
    InfraCost("Storage", "S3 Standard 5TB", "S3 IA 3TB + Lifecycle", 115, 45, 61),
    InfraCost("Network", "Cross-region Transfer", "Same-region + VPC Endpoint", 200, 50, 75),
    InfraCost("Monitoring", "Full metrics 1s", "Key metrics 60s + Sampling", 150, 40, 73),
]

print("\n=== Infrastructure Cost Optimization ===")
total_before = sum(c.monthly_before for c in costs)
total_after = sum(c.monthly_after for c in costs)
for c in costs:
    print(f"  [{c.resource}]")
    print(f"    Before: /mo ({c.current_config})")
    print(f"    After:  /mo ({c.optimized_config})")
    print(f"    Saving: {c.saving_pct}%")

print(f"\n  Total Before: /mo")
print(f"  Total After:  /mo")
print(f"  Total Saving: /mo ({((total_before - total_after) / total_before) * 100:.0f}%)")

Incremental Processing

# === Incremental Processing Strategy ===

# Full Refresh vs Incremental
# Full: SELECT * FROM source_table → Process ALL → Write ALL
# Incremental: SELECT * FROM source WHERE updated_at > last_run → Process NEW → Upsert

# from prefect import flow, task
# from prefect.variables import Variable
# from datetime import datetime
#
# @task
# def get_last_watermark(pipeline_name: str) -> str:
#     """Get last processed timestamp"""
#     wm = Variable.get(f"watermark_{pipeline_name}", default="2020-01-01T00:00:00")
#     return wm
#
# @task
# def extract_incremental(table: str, watermark: str) -> list:
#     """Extract only new/changed records"""
#     query = f"SELECT * FROM {table} WHERE updated_at > '{watermark}'"
#     return execute_query(query)
#
# @task
# def update_watermark(pipeline_name: str, new_watermark: str):
#     """Update watermark after successful load"""
#     Variable.set(f"watermark_{pipeline_name}", new_watermark)
#
# @flow
# def incremental_pipeline(table: str):
#     watermark = get_last_watermark(table)
#     new_data = extract_incremental(table, watermark)
#     if new_data:
#         transformed = transform(new_data)
#         load(transformed)
#         new_wm = max(r['updated_at'] for r in new_data)
#         update_watermark(table, new_wm)

@dataclass
class ProcessingComparison:
    pipeline: str
    full_rows: str
    incremental_rows: str
    full_time: str
    incremental_time: str
    cost_reduction: str

comparisons = [
    ProcessingComparison("User Events", "50M rows", "500K rows", "45 min", "3 min", "93%"),
    ProcessingComparison("Orders", "10M rows", "50K rows", "15 min", "1 min", "93%"),
    ProcessingComparison("Product Catalog", "500K rows", "2K rows", "5 min", "10 sec", "97%"),
    ProcessingComparison("Logs", "200M rows", "5M rows", "2 hours", "10 min", "92%"),
]

print("Full Refresh vs Incremental:")
for c in comparisons:
    print(f"  [{c.pipeline}]")
    print(f"    Full: {c.full_rows} in {c.full_time}")
    print(f"    Incremental: {c.incremental_rows} in {c.incremental_time}")
    print(f"    Cost Reduction: {c.cost_reduction}")

# Cost Monitoring Dashboard
monthly_costs = {
    "Compute (Workers)": "$400",
    "Database (RDS)": "$450",
    "Storage (S3)": "$45",
    "Network Transfer": "$50",
    "Monitoring": "$40",
    "Prefect Cloud": "$100",
    "Total": "$1,085/month",
    "vs Last Month": "-$1,380 (56% reduction)",
}

print(f"\n\nMonthly Cost Dashboard:")
for k, v in monthly_costs.items():
    print(f"  {k}: {v}")

เคล็ดลับ

Prefect Cost Optimization คืออะไร

ปรับลดค่าใช้จ่าย Data Pipeline Caching Right-sizing Scheduling Spot Instances Incremental Serverless Auto-scaling Monitoring ทุกวัน

ลดค่า Cloud Compute อย่างไร

Spot 60-90% Right-sizing Instance Auto-scaling Serverless Lambda Fargate Reserved Scheduling Off-peak Region Selection

Caching ช่วยลดค่าใช้จ่ายอย่างไร

Task Caching ไม่คำนวณซ้ำ cache_key_fn Input cache_expiration S3 GCS ลด API Database Compute 30-70% Redundant

Incremental Processing คืออะไร

ประมวลผลเฉพาะข้อมูลใหม่ Watermark Timestamp CDC ลด Volume เวลา Compute Network 50-90% เทียบ Full Refresh

สรุป

Prefect Workflow Cost Optimization ลดค่าใช้จ่าย Caching Incremental Processing Right-sizing Spot Instances Serverless Scheduling Monitoring Cloud Cost Reduction Production

📖 บทความที่เกี่ยวข้อง

Data Lakehouse Cost Optimization ลดค่าใช้จ่ายอ่านบทความ → Prefect Workflow Hexagonal Architectureอ่านบทความ → RAG Architecture Cost Optimization ลดค่าใช้จ่ายอ่านบทความ → Prefect Workflow API Gateway Patternอ่านบทความ → Flatcar Container Linux Cost Optimization ลดค่าใช้จ่ายอ่านบทความ →

📚 ดูบทความทั้งหมด →