Prefect Cost Optimization
Prefect Workflow Cost Optimization ลดค่าใช้จ่าย Caching Right-sizing Scheduling Spot Instances Incremental Processing Serverless Auto-scaling Monitoring Cloud Cost
| Strategy | Savings | Effort | Risk | เหมาะกับ |
|---|---|---|---|---|
| Task Caching | 30-70% | ต่ำ | ต่ำ | ทุก Pipeline |
| Spot Instances | 60-90% | ปานกลาง | ปานกลาง | Fault-tolerant |
| Incremental | 50-90% | สูง | ต่ำ | Large Datasets |
| Right-sizing | 20-50% | ต่ำ | ต่ำ | Over-provisioned |
| Serverless | 40-80% | ปานกลาง | ต่ำ | Sporadic Workload |
Caching Strategy
# === Prefect Caching for Cost Reduction ===
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib
# Task Caching — ไม่คำนวณซ้ำถ้า Input เหมือนเดิม
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=24),
retries=3,
retry_delay_seconds=60,
)
def expensive_api_call(endpoint: str, params: dict) -> dict:
"""API call ที่มีค่าใช้จ่าย — Cache ผลลัพธ์"""
import httpx
response = httpx.get(endpoint, params=params, timeout=30)
return response.json()
# Custom Cache Key — Cache ตาม Date
def daily_cache_key(context, parameters):
"""Cache key ที่เปลี่ยนวันละครั้ง"""
from datetime import date
param_hash = hashlib.md5(str(parameters).encode()).hexdigest()[:8]
return f"{context.task.fn.__name__}-{date.today()}-{param_hash}"
@task(cache_key_fn=daily_cache_key, cache_expiration=timedelta(hours=25))
def daily_aggregation(data: list) -> dict:
"""Aggregate ที่คำนวณวันละครั้ง"""
total = sum(item.get('amount', 0) for item in data)
return {"total": total, "count": len(data)}
# Result Storage — เก็บ Cache ใน S3
# from prefect.filesystems import S3
# s3_block = S3(bucket_path="my-bucket/prefect-cache")
# s3_block.save("production-cache")
#
# @task(result_storage=S3.load("production-cache"),
# cache_key_fn=task_input_hash,
# cache_expiration=timedelta(hours=12))
# def cached_transform(data):
# return expensive_transformation(data)
from dataclasses import dataclass
@dataclass
class CacheSaving:
pipeline: str
without_cache_min: float
with_cache_min: float
api_calls_saved: int
cost_saved_pct: float
savings = [
CacheSaving("Daily User ETL", 15.0, 3.0, 500, 65),
CacheSaving("Hourly Events", 8.0, 2.0, 200, 70),
CacheSaving("Weekly Reports", 45.0, 10.0, 1000, 75),
CacheSaving("API Enrichment", 30.0, 5.0, 2000, 80),
]
print("=== Caching Savings ===")
for s in savings:
print(f" [{s.pipeline}]")
print(f" Without Cache: {s.without_cache_min}min | With: {s.with_cache_min}min")
print(f" API Calls Saved: {s.api_calls_saved} | Cost Saved: {s.cost_saved_pct}%")
Infrastructure Optimization
# === Infrastructure Cost Optimization ===
# Work Pool Configuration — Right-sizing
# prefect.yaml
# work_pools:
# - name: small-tasks
# type: docker
# base_job_template:
# variables:
# image: python:3.12-slim
# cpu: 0.5
# memory: 1024 # 1GB
# description: "For lightweight ETL tasks"
#
# - name: heavy-compute
# type: docker
# base_job_template:
# variables:
# image: python:3.12
# cpu: 4
# memory: 16384 # 16GB
# description: "For ML training and large transforms"
#
# - name: spot-pool
# type: kubernetes
# base_job_template:
# variables:
# node_selector:
# lifecycle: spot
# tolerations:
# - key: spot
# operator: Equal
# value: "true"
# description: "Spot instances for fault-tolerant work"
# Scheduling — Off-peak pricing
# deployments:
# - name: heavy-etl
# schedule:
# cron: "0 3 * * *" # 3 AM — Off-peak
# timezone: "Asia/Bangkok"
# - name: light-sync
# schedule:
# cron: "*/15 * * * *" # Every 15 min
# work_pool: small-tasks
@dataclass
class InfraCost:
resource: str
current_config: str
optimized_config: str
monthly_before: float
monthly_after: float
saving_pct: float
costs = [
InfraCost("Worker Nodes", "4x m5.xlarge On-demand", "2x m5.large + Spot", 1200, 400, 67),
InfraCost("Database", "db.r5.2xlarge", "db.r5.large + Read Replica", 800, 450, 44),
InfraCost("Storage", "S3 Standard 5TB", "S3 IA 3TB + Lifecycle", 115, 45, 61),
InfraCost("Network", "Cross-region Transfer", "Same-region + VPC Endpoint", 200, 50, 75),
InfraCost("Monitoring", "Full metrics 1s", "Key metrics 60s + Sampling", 150, 40, 73),
]
print("\n=== Infrastructure Cost Optimization ===")
total_before = sum(c.monthly_before for c in costs)
total_after = sum(c.monthly_after for c in costs)
for c in costs:
print(f" [{c.resource}]")
print(f" Before: /mo ({c.current_config})")
print(f" After: /mo ({c.optimized_config})")
print(f" Saving: {c.saving_pct}%")
print(f"\n Total Before: /mo")
print(f" Total After: /mo")
print(f" Total Saving: /mo ({((total_before - total_after) / total_before) * 100:.0f}%)")
Incremental Processing
# === Incremental Processing Strategy ===
# Full Refresh vs Incremental
# Full: SELECT * FROM source_table → Process ALL → Write ALL
# Incremental: SELECT * FROM source WHERE updated_at > last_run → Process NEW → Upsert
# from prefect import flow, task
# from prefect.variables import Variable
# from datetime import datetime
#
# @task
# def get_last_watermark(pipeline_name: str) -> str:
# """Get last processed timestamp"""
# wm = Variable.get(f"watermark_{pipeline_name}", default="2020-01-01T00:00:00")
# return wm
#
# @task
# def extract_incremental(table: str, watermark: str) -> list:
# """Extract only new/changed records"""
# query = f"SELECT * FROM {table} WHERE updated_at > '{watermark}'"
# return execute_query(query)
#
# @task
# def update_watermark(pipeline_name: str, new_watermark: str):
# """Update watermark after successful load"""
# Variable.set(f"watermark_{pipeline_name}", new_watermark)
#
# @flow
# def incremental_pipeline(table: str):
# watermark = get_last_watermark(table)
# new_data = extract_incremental(table, watermark)
# if new_data:
# transformed = transform(new_data)
# load(transformed)
# new_wm = max(r['updated_at'] for r in new_data)
# update_watermark(table, new_wm)
@dataclass
class ProcessingComparison:
pipeline: str
full_rows: str
incremental_rows: str
full_time: str
incremental_time: str
cost_reduction: str
comparisons = [
ProcessingComparison("User Events", "50M rows", "500K rows", "45 min", "3 min", "93%"),
ProcessingComparison("Orders", "10M rows", "50K rows", "15 min", "1 min", "93%"),
ProcessingComparison("Product Catalog", "500K rows", "2K rows", "5 min", "10 sec", "97%"),
ProcessingComparison("Logs", "200M rows", "5M rows", "2 hours", "10 min", "92%"),
]
print("Full Refresh vs Incremental:")
for c in comparisons:
print(f" [{c.pipeline}]")
print(f" Full: {c.full_rows} in {c.full_time}")
print(f" Incremental: {c.incremental_rows} in {c.incremental_time}")
print(f" Cost Reduction: {c.cost_reduction}")
# Cost Monitoring Dashboard
monthly_costs = {
"Compute (Workers)": "$400",
"Database (RDS)": "$450",
"Storage (S3)": "$45",
"Network Transfer": "$50",
"Monitoring": "$40",
"Prefect Cloud": "$100",
"Total": "$1,085/month",
"vs Last Month": "-$1,380 (56% reduction)",
}
print(f"\n\nMonthly Cost Dashboard:")
for k, v in monthly_costs.items():
print(f" {k}: {v}")
เคล็ดลับ
- Cache: ใช้ Task Caching ลด Redundant Computation ทันที
- Incremental: ประมวลผลเฉพาะข้อมูลใหม่ ไม่ Full Refresh
- Right-size: ใช้ Resource เท่าที่จำเป็น ไม่ Over-provision
- Spot: ใช้ Spot Instances สำหรับ Fault-tolerant Pipeline
- Monitor: ติดตามค่าใช้จ่ายทุกวัน ตั้ง Alert เมื่อเกิน Budget
Prefect Cost Optimization คืออะไร
ปรับลดค่าใช้จ่าย Data Pipeline Caching Right-sizing Scheduling Spot Instances Incremental Serverless Auto-scaling Monitoring ทุกวัน
ลดค่า Cloud Compute อย่างไร
Spot 60-90% Right-sizing Instance Auto-scaling Serverless Lambda Fargate Reserved Scheduling Off-peak Region Selection
Caching ช่วยลดค่าใช้จ่ายอย่างไร
Task Caching ไม่คำนวณซ้ำ cache_key_fn Input cache_expiration S3 GCS ลด API Database Compute 30-70% Redundant
Incremental Processing คืออะไร
ประมวลผลเฉพาะข้อมูลใหม่ Watermark Timestamp CDC ลด Volume เวลา Compute Network 50-90% เทียบ Full Refresh
สรุป
Prefect Workflow Cost Optimization ลดค่าใช้จ่าย Caching Incremental Processing Right-sizing Spot Instances Serverless Scheduling Monitoring Cloud Cost Reduction Production
