Prefect Workflow Capacity Planning คืออะไร
Prefect เป็น modern workflow orchestration framework สำหรับ data engineering และ ML pipelines ที่เขียนด้วย Python ใช้แทน Apache Airflow ด้วย developer experience ที่ดีกว่า Capacity Planning คือการวางแผนทรัพยากร (compute, storage, network) ให้เพียงพอสำหรับ workloads ทั้งปัจจุบันและอนาคต การรวมสองแนวคิดนี้ช่วยให้ทีม data engineering วางแผน infrastructure สำหรับ Prefect workflows อย่างมีประสิทธิภาพ จัดสรร workers, concurrency limits และ resources ให้เหมาะกับ workload patterns
Prefect Architecture
# prefect_arch.py — Prefect architecture
import json
class PrefectArchitecture:
COMPONENTS = {
"server": {
"name": "Prefect Server / Cloud",
"description": "API server สำหรับ scheduling, monitoring, UI dashboard",
"options": ["Prefect Cloud (SaaS)", "Prefect Server (self-hosted, OSS)"],
},
"worker": {
"name": "Workers",
"description": "Process ที่รัน flows — poll work pools สำหรับ scheduled runs",
"types": ["Process Worker", "Docker Worker", "Kubernetes Worker", "ECS Worker"],
},
"work_pool": {
"name": "Work Pools",
"description": "กลุ่มของ infrastructure config สำหรับรัน flows",
"use": "จัดกลุ่ม workflows ตาม resource requirements",
},
"flow": {
"name": "Flows & Tasks",
"description": "Python functions decorated ด้วย @flow และ @task",
"features": ["Retries", "Caching", "Concurrency", "Subflows", "Parameters"],
},
"deployments": {
"name": "Deployments",
"description": "Configuration สำหรับ schedule และรัน flows remotely",
"schedule": "Cron, Interval, RRule",
},
}
SETUP = """
# Prefect setup
pip install prefect
# Start Prefect server (self-hosted)
prefect server start
# Or use Prefect Cloud
prefect cloud login
# Create work pool
prefect work-pool create my-pool --type process
# Start worker
prefect worker start --pool my-pool
"""
def show_components(self):
print("=== Prefect Architecture ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print()
def show_setup(self):
print("=== Quick Setup ===")
print(self.SETUP[:400])
arch = PrefectArchitecture()
arch.show_components()
arch.show_setup()
Workflow Examples
# workflows.py — Prefect workflow examples
import json
class PrefectWorkflows:
ETL_FLOW = """
# etl_flow.py — ETL pipeline with Prefect
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract(source_url: str) -> pd.DataFrame:
'''Extract data from source'''
df = pd.read_csv(source_url)
print(f"Extracted {len(df)} rows")
return df
@task(retries=2)
def transform(df: pd.DataFrame) -> pd.DataFrame:
'''Clean and transform data'''
df = df.dropna()
df['processed_at'] = pd.Timestamp.now()
df['revenue'] = df['quantity'] * df['price']
print(f"Transformed: {len(df)} rows")
return df
@task(retries=3, retry_delay_seconds=30)
def load(df: pd.DataFrame, table_name: str):
'''Load data to warehouse'''
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost/warehouse")
df.to_sql(table_name, engine, if_exists='append', index=False)
print(f"Loaded {len(df)} rows to {table_name}")
@flow(name="Daily ETL Pipeline", log_prints=True)
def etl_pipeline(source_url: str, table_name: str = "sales"):
raw_data = extract(source_url)
clean_data = transform(raw_data)
load(clean_data, table_name)
return {"rows": len(clean_data), "table": table_name}
# Run locally
if __name__ == "__main__":
etl_pipeline("https://data.example.com/sales.csv")
"""
DEPLOYMENT = """
# deploy.py — Prefect deployment configuration
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
# Method 1: Python deployment
deployment = Deployment.build_from_flow(
flow=etl_pipeline,
name="daily-etl",
work_pool_name="production-pool",
schedule=CronSchedule(cron="0 6 * * *"), # Daily at 6 AM
parameters={"source_url": "https://data.example.com/sales.csv"},
tags=["etl", "production"],
)
deployment.apply()
# Method 2: prefect.yaml
# prefect.yaml
deployments:
- name: daily-etl
entrypoint: etl_flow.py:etl_pipeline
work_pool:
name: production-pool
schedule:
cron: "0 6 * * *"
parameters:
source_url: "https://data.example.com/sales.csv"
"""
def show_etl(self):
print("=== ETL Flow ===")
print(self.ETL_FLOW[:600])
def show_deployment(self):
print(f"\n=== Deployment ===")
print(self.DEPLOYMENT[:500])
wf = PrefectWorkflows()
wf.show_etl()
wf.show_deployment()
Capacity Planning Framework
# capacity.py — Capacity planning for Prefect
import json
import random
class CapacityPlanning:
DIMENSIONS = {
"compute": {
"name": "Compute (CPU/Memory)",
"factors": ["จำนวน concurrent flows", "CPU/Memory ต่อ flow", "Peak hours"],
"formula": "Workers needed = ceil(peak_concurrent_flows / flows_per_worker)",
},
"storage": {
"name": "Storage",
"factors": ["Flow logs size", "Task result cache", "Artifact storage"],
"formula": "Storage = (avg_log_size × daily_runs × retention_days) + cache_size",
},
"network": {
"name": "Network",
"factors": ["Data transfer per flow", "API calls to Prefect server", "External API calls"],
"formula": "Bandwidth = avg_data_per_flow × concurrent_flows",
},
"database": {
"name": "Database (Prefect Server)",
"factors": ["Flow runs history", "Task runs history", "Concurrent connections"],
"formula": "DB size grows ~1MB per 1000 flow runs",
},
}
SIZING = {
"small": {
"name": "Small Team (1-5 flows/day)",
"workers": "1 worker (2 vCPU, 4GB RAM)",
"server": "Prefect Cloud Free tier",
"db": "SQLite (built-in)",
"cost": "~$0/month (Cloud free) or $20/month (self-hosted VM)",
},
"medium": {
"name": "Medium Team (10-50 flows/day)",
"workers": "2-3 workers (4 vCPU, 8GB RAM each)",
"server": "Prefect Cloud Pro or self-hosted",
"db": "PostgreSQL (managed)",
"cost": "~$100-500/month",
},
"large": {
"name": "Large Team (100+ flows/day)",
"workers": "5-20 workers (auto-scaling K8s)",
"server": "Prefect Cloud Enterprise or self-hosted HA",
"db": "PostgreSQL HA (RDS Multi-AZ)",
"cost": "~$500-5,000/month",
},
}
def show_dimensions(self):
print("=== Capacity Planning Dimensions ===\n")
for key, dim in self.DIMENSIONS.items():
print(f"[{dim['name']}]")
print(f" Formula: {dim['formula']}")
print()
def show_sizing(self):
print("=== Sizing Guide ===")
for key, size in self.SIZING.items():
print(f"\n[{size['name']}]")
print(f" Workers: {size['workers']}")
print(f" Server: {size['server']}")
print(f" Cost: {size['cost']}")
def calculator(self):
print(f"\n=== Capacity Calculator Example ===")
daily_flows = 50
avg_duration_min = 15
peak_hour_pct = 0.4
flows_per_worker = 4
peak_concurrent = int(daily_flows * peak_hour_pct / (60 / avg_duration_min))
workers_needed = max(1, -(-peak_concurrent // flows_per_worker))
print(f" Daily flows: {daily_flows}")
print(f" Avg duration: {avg_duration_min} min")
print(f" Peak hour: {peak_hour_pct*100:.0f}% of daily flows")
print(f" Peak concurrent: ~{peak_concurrent} flows")
print(f" Flows per worker: {flows_per_worker}")
print(f" Workers needed: {workers_needed}")
cap = CapacityPlanning()
cap.show_dimensions()
cap.show_sizing()
cap.calculator()
Concurrency & Performance
# performance.py — Concurrency and performance tuning
import json
import random
class PerformanceTuning:
CONCURRENCY = """
# concurrency.py — Prefect concurrency controls
from prefect import flow, task
from prefect.concurrency.sync import concurrency
# 1. Task-level concurrency (limit parallel tasks)
@task(tags=["database"])
def query_database(query: str):
with concurrency("database-connections", occupy=1):
# Only N concurrent database queries
result = execute_query(query)
return result
# 2. Global concurrency limits (via Prefect UI/CLI)
# prefect concurrency-limit create database-connections --limit 5
# prefect concurrency-limit create api-calls --limit 10
# 3. Work pool concurrency
# Set in work pool config: max concurrent flow runs per worker
# 4. Task concurrency with tags
@task(tags=["api-call"])
def call_external_api(url: str):
# Prefect will limit concurrent tasks with same tag
return requests.get(url).json()
# 5. Rate limiting
from prefect.concurrency.sync import rate_limit
@task
def rate_limited_api_call(url: str):
rate_limit("external-api", occupy=1) # 10 per minute
return requests.get(url).json()
"""
OPTIMIZATION_TIPS = {
"caching": {
"name": "Task Caching",
"description": "Cache task results เพื่อไม่ต้อง re-compute",
"saving": "ลด compute time 50-90% สำหรับ repeated inputs",
},
"parallel": {
"name": "Parallel Task Execution",
"description": "ใช้ .submit() สำหรับ concurrent task execution",
"saving": "ลด wall time ตามจำนวน parallel tasks",
},
"resource_class": {
"name": "Right-size Workers",
"description": "ใช้ worker size ที่เหมาะกับ workload — ไม่ over-provision",
"saving": "ลดค่า infrastructure 30-50%",
},
"scheduling": {
"name": "Smart Scheduling",
"description": "กระจาย flows ไม่ให้ peak พร้อมกัน",
"saving": "ลด peak workers needed 20-40%",
},
}
def show_concurrency(self):
print("=== Concurrency Controls ===")
print(self.CONCURRENCY[:500])
def show_tips(self):
print(f"\n=== Optimization Tips ===")
for key, tip in self.OPTIMIZATION_TIPS.items():
print(f" [{tip['name']}] {tip['description']} → {tip['saving']}")
def performance_dashboard(self):
print(f"\n=== Performance Dashboard ===")
flows = [
{"name": "daily-etl", "avg_min": random.uniform(5, 20), "success": random.uniform(95, 100), "runs_today": random.randint(1, 5)},
{"name": "hourly-sync", "avg_min": random.uniform(2, 8), "success": random.uniform(90, 100), "runs_today": random.randint(20, 24)},
{"name": "ml-training", "avg_min": random.uniform(30, 120), "success": random.uniform(85, 100), "runs_today": random.randint(1, 3)},
{"name": "report-gen", "avg_min": random.uniform(3, 10), "success": random.uniform(95, 100), "runs_today": random.randint(5, 10)},
]
for f in flows:
status = "OK" if f["success"] > 95 else "WARN"
print(f" [{status:>4}] {f['name']:<20} Avg: {f['avg_min']:>5.1f}min | Success: {f['success']:.1f}% | Runs: {f['runs_today']}")
perf = PerformanceTuning()
perf.show_concurrency()
perf.show_tips()
perf.performance_dashboard()
Monitoring & Alerting
# monitoring.py — Prefect monitoring and alerting
import json
class PrefectMonitoring:
AUTOMATIONS = """
# Prefect Automations (via UI or API)
# 1. Alert on flow failure
{
"name": "Alert on ETL Failure",
"trigger": {
"type": "event",
"expect": ["prefect.flow-run.Failed"],
"match": {"prefect.resource.name": "daily-etl"}
},
"actions": [
{"type": "send-notification", "block_document_id": "slack-webhook-id"}
]
}
# 2. Alert on long-running flows
{
"name": "Alert on Long Running Flow",
"trigger": {
"type": "event",
"expect": ["prefect.flow-run.Running"],
"after": {"seconds": 3600}
},
"actions": [
{"type": "send-notification", "block_document_id": "pagerduty-id"}
]
}
"""
METRICS = {
"flow_success_rate": "เปอร์เซ็นต์ flow runs ที่สำเร็จ (target: > 95%)",
"avg_duration": "เวลาเฉลี่ยต่อ flow run (track trend)",
"queue_time": "เวลาที่ flow รอใน queue ก่อนรัน (ถ้าสูง = ต้องเพิ่ม workers)",
"worker_utilization": "% ที่ workers ทำงาน (70-80% = optimal)",
"task_retry_rate": "อัตราการ retry ของ tasks (สูง = มีปัญหา)",
"concurrency_saturation": "% ที่ concurrency limit ถูกใช้ (> 90% = ต้องเพิ่ม limit)",
}
def show_automations(self):
print("=== Prefect Automations ===")
print(self.AUTOMATIONS[:500])
def show_metrics(self):
print(f"\n=== Key Metrics ===")
for name, desc in self.METRICS.items():
print(f" [{name}] {desc}")
def capacity_alerts(self):
print(f"\n=== Capacity Alerts ===")
alerts = [
{"name": "Queue time > 5 min", "action": "เพิ่ม workers หรือ scale up", "severity": "Warning"},
{"name": "Worker CPU > 80%", "action": "เพิ่ม worker size หรือจำนวน", "severity": "Warning"},
{"name": "Flow failure rate > 10%", "action": "ตรวจสอบ logs, fix bugs", "severity": "Critical"},
{"name": "Disk > 80%", "action": "เพิ่ม storage, cleanup old logs", "severity": "Warning"},
]
for a in alerts:
print(f" [{a['severity']:>8}] {a['name']} → {a['action']}")
mon = PrefectMonitoring()
mon.show_automations()
mon.show_metrics()
mon.capacity_alerts()
FAQ - คำถามที่พบบ่อย
Q: Prefect กับ Airflow อันไหนดี?
A: Prefect: Pythonic, dynamic workflows, easy setup, Cloud option, better DX Airflow: mature, large ecosystem, community ใหญ่, more operators ใช้ Prefect: ทีมเล็ก-กลาง, Python-first, ต้องการ simplicity ใช้ Airflow: enterprise, existing Airflow infrastructure, complex DAGs Prefect 2.x ง่ายกว่า Airflow มาก — decorator-based, ไม่ต้อง DAG definition
Q: ต้องใช้ workers กี่ตัว?
A: คำนวณ: Workers = ceil(peak_concurrent_flows / flows_per_worker) ตัวอย่าง: 50 flows/day, peak 40% in 1 hour, avg 15 min/flow → peak concurrent ~5 flows → 2 workers (4 flows/worker) เริ่มจาก 1-2 workers แล้ว monitor queue time — ถ้า > 5 min ให้เพิ่ม Auto-scaling: ใช้ Kubernetes worker + HPA
Q: Prefect Cloud กับ self-hosted อันไหนดี?
A: Prefect Cloud: ไม่ต้องดูแล server, UI ดี, automations, RBAC, audit logs, free tier มี Self-hosted: ฟรี, full control, data ไม่ออกนอก network แนะนำ: เริ่มจาก Cloud (free tier) → self-hosted เมื่อต้องการ control/compliance Cloud pricing: Free (3 users) → Pro ($50/user/month) → Enterprise (custom)
Q: Capacity planning ต้องดูอะไรบ้าง?
A: 1) จำนวน flows/day และ growth rate 2) Peak concurrent flows (กี่ flows รันพร้อมกัน) 3) Resource ต่อ flow (CPU, memory, duration) 4) Storage สำหรับ logs, results, artifacts 5) Database size (Prefect server) ทบทวนทุก quarter — adjust ตาม actual usage + growth projection
