Prefect Data Pipeline
Prefect Workflow Data Pipeline ETL Extract Transform Load Python Flow Task Schedule Retry Caching Notification Observability Prefect Cloud Airflow Alternative
| Orchestrator | Language | Dynamic DAG | Cloud | เหมาะกับ |
|---|---|---|---|---|
| Prefect | Python | ใช่ | Prefect Cloud | Modern ETL |
| Airflow | Python | จำกัด | MWAA Astronomer | Enterprise |
| Dagster | Python | ใช่ | Dagster Cloud | Data Assets |
| Luigi | Python | ไม่ | ไม่มี | Simple Pipeline |
| dbt | SQL | ไม่ | dbt Cloud | SQL Transform |
Prefect Flow และ Task
# === Prefect ETL Pipeline ===
# pip install prefect httpx pandas sqlalchemy
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import json
# Extract — ดึงข้อมูลจาก API
@task(retries=3, retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def extract_api_data(url: str) -> list:
"""Extract data from REST API"""
import httpx
response = httpx.get(url, timeout=30)
response.raise_for_status()
data = response.json()
print(f"Extracted {len(data)} records from {url}")
return data
# Transform — แปลงข้อมูล
@task
def transform_data(raw_data: list) -> pd.DataFrame:
"""Clean and transform raw data"""
df = pd.DataFrame(raw_data)
# Clean
df = df.dropna(subset=['id', 'name'])
df['name'] = df['name'].str.strip()
df['created_at'] = pd.to_datetime(df.get('created_at', pd.Timestamp.now()))
# Aggregate
if 'amount' in df.columns:
df['amount'] = pd.to_numeric(df['amount'], errors='coerce').fillna(0)
print(f"Transformed: {len(df)} records, {len(df.columns)} columns")
return df
# Load — โหลดเข้า Database
@task(retries=2, retry_delay_seconds=30)
def load_to_database(df: pd.DataFrame, table_name: str) -> int:
"""Load DataFrame to database"""
# from sqlalchemy import create_engine
# engine = create_engine("postgresql://user:pass@localhost/warehouse")
# df.to_sql(table_name, engine, if_exists='append', index=False)
rows = len(df)
print(f"Loaded {rows} records to {table_name}")
return rows
# Main ETL Flow
@flow(name="daily-etl-pipeline",
description="Daily ETL from API to Warehouse",
log_prints=True)
def etl_pipeline(api_url: str, table_name: str):
"""Main ETL Pipeline"""
# Extract
raw_data = extract_api_data(api_url)
# Transform
clean_df = transform_data(raw_data)
# Load
rows_loaded = load_to_database(clean_df, table_name)
return {"rows_loaded": rows_loaded, "status": "success"}
# Run locally
# etl_pipeline(
# api_url="https://api.example.com/data",
# table_name="daily_data"
# )
print("=== Prefect ETL Pipeline ===")
print(" Flow: daily-etl-pipeline")
print(" Tasks: extract_api_data → transform_data → load_to_database")
print(" Features: Retry, Caching, Logging, Error Handling")
Advanced Features
# === Advanced Prefect Features ===
# Concurrent Tasks
# from prefect import flow, task
# from prefect.futures import wait
#
# @task
# def extract_source(source: str) -> list:
# return fetch_data(source)
#
# @flow
# def multi_source_etl():
# sources = ["api_1", "api_2", "database_1", "s3_bucket"]
# futures = [extract_source.submit(s) for s in sources]
# results = [f.result() for f in futures]
# combined = combine_data(results)
# load_to_warehouse(combined)
# Sub-flows
# @flow
# def extract_flow():
# api_data = extract_api_data("https://api.example.com/users")
# db_data = extract_database("SELECT * FROM orders")
# return merge_data(api_data, db_data)
#
# @flow
# def transform_flow(raw_data):
# cleaned = clean_data(raw_data)
# validated = validate_data(cleaned)
# enriched = enrich_data(validated)
# return enriched
#
# @flow(name="main-pipeline")
# def main_pipeline():
# raw = extract_flow()
# transformed = transform_flow(raw)
# load_to_warehouse(transformed)
# Deployment — prefect.yaml
# deployments:
# - name: daily-etl
# entrypoint: flows/etl.py:etl_pipeline
# schedule:
# cron: "0 2 * * *" # Daily 2AM
# timezone: "Asia/Bangkok"
# parameters:
# api_url: "https://api.example.com/data"
# table_name: "daily_data"
# work_pool:
# name: docker-pool
# tags: ["etl", "daily", "production"]
# Notification
# from prefect.blocks.notifications import SlackWebhook
# slack = SlackWebhook(url="https://hooks.slack.com/...")
# slack.notify("ETL Pipeline completed: 10,000 rows loaded")
from dataclasses import dataclass
@dataclass
class PipelineMetric:
pipeline: str
schedule: str
avg_duration: str
success_rate: float
rows_per_run: str
last_run: str
pipelines = [
PipelineMetric("Daily User ETL", "0 2 * * *", "5 min", 99.2, "50,000", "2025-01-20 02:00"),
PipelineMetric("Hourly Events", "0 * * * *", "2 min", 98.5, "200,000", "2025-01-20 14:00"),
PipelineMetric("Weekly Reports", "0 6 * * MON", "15 min", 100.0, "500,000", "2025-01-20 06:00"),
PipelineMetric("Real-time Sync", "*/5 * * * *", "30 sec", 97.8, "5,000", "2025-01-20 14:55"),
]
print("\n=== Pipeline Dashboard ===")
for p in pipelines:
print(f" [{p.pipeline}] Schedule: {p.schedule}")
print(f" Duration: {p.avg_duration} | Success: {p.success_rate}%")
print(f" Rows: {p.rows_per_run} | Last: {p.last_run}")
Production Operations
# === Production Deployment ===
# Docker Work Pool
# prefect work-pool create docker-pool --type docker
# prefect worker start --pool docker-pool
# Kubernetes Work Pool
# prefect work-pool create k8s-pool --type kubernetes
# prefect worker start --pool k8s-pool
# Deploy
# prefect deploy --all
# prefect deployment run daily-etl-pipeline/daily-etl
# Monitoring
# prefect server start # Self-hosted UI at localhost:4200
@dataclass
class InfraConfig:
component: str
type_val: str
config: str
scaling: str
infra = [
InfraConfig("Prefect Server", "Self-hosted / Cloud", "2 CPU 4GB RAM", "Single instance"),
InfraConfig("Worker (Docker)", "Docker Work Pool", "4 CPU 8GB RAM", "Auto-scale 1-5"),
InfraConfig("Worker (K8s)", "Kubernetes Work Pool", "Job per Flow Run", "HPA Auto"),
InfraConfig("Database", "PostgreSQL", "16GB RAM SSD", "Primary + Replica"),
InfraConfig("Object Storage", "S3 / MinIO", "Unlimited", "Auto"),
]
print("Production Infrastructure:")
for i in infra:
print(f" [{i.component}] {i.type_val}")
print(f" Config: {i.config} | Scaling: {i.scaling}")
best_practices = [
"Retry: ตั้ง retries=3 สำหรับ External API calls",
"Caching: Cache expensive computations ลด Runtime",
"Idempotent: ทำให้ Task Idempotent รันซ้ำได้ปลอดภัย",
"Small Tasks: แบ่ง Task เล็กๆ ง่ายต่อ Debug และ Retry",
"Notification: ตั้ง Slack/Email Alert เมื่อ Flow ล้มเหลว",
"Monitoring: ดู Dashboard ทุกวัน หา Slow/Failed Flows",
"Testing: Test Flow เหมือน Python Function ปกติ",
"Version: ใช้ Git Version Control สำหรับ Flow Code",
]
print(f"\n\nBest Practices:")
for i, b in enumerate(best_practices, 1):
print(f" {i}. {b}")
เคล็ดลับ
- Retry: ตั้ง Retry สำหรับ External API ทุกครั้ง
- Cache: Cache ข้อมูลที่ไม่เปลี่ยนบ่อย ประหยัดเวลา
- Idempotent: ทำ Task ให้รันซ้ำได้โดยไม่มีผลข้างเคียง
- Monitor: ดู Prefect Dashboard ทุกวัน ตั้ง Alert
- Test: Test Flow ด้วย pytest เหมือน Python ปกติ
Prefect คืออะไร
Workflow Orchestration Python @flow @task Dashboard Retry Error Handling Scheduling Caching Notification Prefect Cloud Alternative Airflow
Prefect ต่างจาก Airflow อย่างไร
Python ธรรมดา ไม่ต้อง DAG Dynamic Workflow Runtime Testing ง่าย Prefect Cloud ไม่ต้อง Setup Retry Caching Built-in
สร้าง ETL Pipeline อย่างไร
Extract API Database File Transform Clean Validate Aggregate Load Warehouse @task แต่ละขั้น @flow รวม Schedule Cron Retry Notification
Production Deployment ทำอย่างไร
Prefect Cloud Self-hosted prefect deploy Schedule Cron Work Pool Docker Kubernetes Agent Dashboard Monitoring Notification Slack Email
สรุป
Prefect Workflow Data Pipeline ETL Python Flow Task Retry Caching Schedule Notification Production Docker Kubernetes Monitoring Dashboard Observability
