SiamCafe.net Blog
Technology

Prefect Workflow Data Pipeline ETL

prefect workflow data pipeline etl
Prefect Workflow Data Pipeline ETL | SiamCafe Blog
2025-09-12· อ. บอม — SiamCafe.net· 8,268 คำ

Prefect Data Pipeline

Prefect Workflow Data Pipeline ETL Extract Transform Load Python Flow Task Schedule Retry Caching Notification Observability Prefect Cloud Airflow Alternative

OrchestratorLanguageDynamic DAGCloudเหมาะกับ
PrefectPythonใช่Prefect CloudModern ETL
AirflowPythonจำกัดMWAA AstronomerEnterprise
DagsterPythonใช่Dagster CloudData Assets
LuigiPythonไม่ไม่มีSimple Pipeline
dbtSQLไม่dbt CloudSQL Transform

Prefect Flow และ Task

# === Prefect ETL Pipeline ===

# pip install prefect httpx pandas sqlalchemy

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import json

# Extract — ดึงข้อมูลจาก API
@task(retries=3, retry_delay_seconds=60,
      cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def extract_api_data(url: str) -> list:
    """Extract data from REST API"""
    import httpx
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    data = response.json()
    print(f"Extracted {len(data)} records from {url}")
    return data

# Transform — แปลงข้อมูล
@task
def transform_data(raw_data: list) -> pd.DataFrame:
    """Clean and transform raw data"""
    df = pd.DataFrame(raw_data)
    # Clean
    df = df.dropna(subset=['id', 'name'])
    df['name'] = df['name'].str.strip()
    df['created_at'] = pd.to_datetime(df.get('created_at', pd.Timestamp.now()))
    # Aggregate
    if 'amount' in df.columns:
        df['amount'] = pd.to_numeric(df['amount'], errors='coerce').fillna(0)
    print(f"Transformed: {len(df)} records, {len(df.columns)} columns")
    return df

# Load — โหลดเข้า Database
@task(retries=2, retry_delay_seconds=30)
def load_to_database(df: pd.DataFrame, table_name: str) -> int:
    """Load DataFrame to database"""
    # from sqlalchemy import create_engine
    # engine = create_engine("postgresql://user:pass@localhost/warehouse")
    # df.to_sql(table_name, engine, if_exists='append', index=False)
    rows = len(df)
    print(f"Loaded {rows} records to {table_name}")
    return rows

# Main ETL Flow
@flow(name="daily-etl-pipeline",
      description="Daily ETL from API to Warehouse",
      log_prints=True)
def etl_pipeline(api_url: str, table_name: str):
    """Main ETL Pipeline"""
    # Extract
    raw_data = extract_api_data(api_url)
    # Transform
    clean_df = transform_data(raw_data)
    # Load
    rows_loaded = load_to_database(clean_df, table_name)
    return {"rows_loaded": rows_loaded, "status": "success"}

# Run locally
# etl_pipeline(
#     api_url="https://api.example.com/data",
#     table_name="daily_data"
# )

print("=== Prefect ETL Pipeline ===")
print("  Flow: daily-etl-pipeline")
print("  Tasks: extract_api_data → transform_data → load_to_database")
print("  Features: Retry, Caching, Logging, Error Handling")

Advanced Features

# === Advanced Prefect Features ===

# Concurrent Tasks
# from prefect import flow, task
# from prefect.futures import wait
#
# @task
# def extract_source(source: str) -> list:
#     return fetch_data(source)
#
# @flow
# def multi_source_etl():
#     sources = ["api_1", "api_2", "database_1", "s3_bucket"]
#     futures = [extract_source.submit(s) for s in sources]
#     results = [f.result() for f in futures]
#     combined = combine_data(results)
#     load_to_warehouse(combined)

# Sub-flows
# @flow
# def extract_flow():
#     api_data = extract_api_data("https://api.example.com/users")
#     db_data = extract_database("SELECT * FROM orders")
#     return merge_data(api_data, db_data)
#
# @flow
# def transform_flow(raw_data):
#     cleaned = clean_data(raw_data)
#     validated = validate_data(cleaned)
#     enriched = enrich_data(validated)
#     return enriched
#
# @flow(name="main-pipeline")
# def main_pipeline():
#     raw = extract_flow()
#     transformed = transform_flow(raw)
#     load_to_warehouse(transformed)

# Deployment — prefect.yaml
# deployments:
#   - name: daily-etl
#     entrypoint: flows/etl.py:etl_pipeline
#     schedule:
#       cron: "0 2 * * *"  # Daily 2AM
#       timezone: "Asia/Bangkok"
#     parameters:
#       api_url: "https://api.example.com/data"
#       table_name: "daily_data"
#     work_pool:
#       name: docker-pool
#     tags: ["etl", "daily", "production"]

# Notification
# from prefect.blocks.notifications import SlackWebhook
# slack = SlackWebhook(url="https://hooks.slack.com/...")
# slack.notify("ETL Pipeline completed: 10,000 rows loaded")

from dataclasses import dataclass

@dataclass
class PipelineMetric:
    pipeline: str
    schedule: str
    avg_duration: str
    success_rate: float
    rows_per_run: str
    last_run: str

pipelines = [
    PipelineMetric("Daily User ETL", "0 2 * * *", "5 min", 99.2, "50,000", "2025-01-20 02:00"),
    PipelineMetric("Hourly Events", "0 * * * *", "2 min", 98.5, "200,000", "2025-01-20 14:00"),
    PipelineMetric("Weekly Reports", "0 6 * * MON", "15 min", 100.0, "500,000", "2025-01-20 06:00"),
    PipelineMetric("Real-time Sync", "*/5 * * * *", "30 sec", 97.8, "5,000", "2025-01-20 14:55"),
]

print("\n=== Pipeline Dashboard ===")
for p in pipelines:
    print(f"  [{p.pipeline}] Schedule: {p.schedule}")
    print(f"    Duration: {p.avg_duration} | Success: {p.success_rate}%")
    print(f"    Rows: {p.rows_per_run} | Last: {p.last_run}")

Production Operations

# === Production Deployment ===

# Docker Work Pool
# prefect work-pool create docker-pool --type docker
# prefect worker start --pool docker-pool

# Kubernetes Work Pool
# prefect work-pool create k8s-pool --type kubernetes
# prefect worker start --pool k8s-pool

# Deploy
# prefect deploy --all
# prefect deployment run daily-etl-pipeline/daily-etl

# Monitoring
# prefect server start  # Self-hosted UI at localhost:4200

@dataclass
class InfraConfig:
    component: str
    type_val: str
    config: str
    scaling: str

infra = [
    InfraConfig("Prefect Server", "Self-hosted / Cloud", "2 CPU 4GB RAM", "Single instance"),
    InfraConfig("Worker (Docker)", "Docker Work Pool", "4 CPU 8GB RAM", "Auto-scale 1-5"),
    InfraConfig("Worker (K8s)", "Kubernetes Work Pool", "Job per Flow Run", "HPA Auto"),
    InfraConfig("Database", "PostgreSQL", "16GB RAM SSD", "Primary + Replica"),
    InfraConfig("Object Storage", "S3 / MinIO", "Unlimited", "Auto"),
]

print("Production Infrastructure:")
for i in infra:
    print(f"  [{i.component}] {i.type_val}")
    print(f"    Config: {i.config} | Scaling: {i.scaling}")

best_practices = [
    "Retry: ตั้ง retries=3 สำหรับ External API calls",
    "Caching: Cache expensive computations ลด Runtime",
    "Idempotent: ทำให้ Task Idempotent รันซ้ำได้ปลอดภัย",
    "Small Tasks: แบ่ง Task เล็กๆ ง่ายต่อ Debug และ Retry",
    "Notification: ตั้ง Slack/Email Alert เมื่อ Flow ล้มเหลว",
    "Monitoring: ดู Dashboard ทุกวัน หา Slow/Failed Flows",
    "Testing: Test Flow เหมือน Python Function ปกติ",
    "Version: ใช้ Git Version Control สำหรับ Flow Code",
]

print(f"\n\nBest Practices:")
for i, b in enumerate(best_practices, 1):
    print(f"  {i}. {b}")

เคล็ดลับ

Prefect คืออะไร

Workflow Orchestration Python @flow @task Dashboard Retry Error Handling Scheduling Caching Notification Prefect Cloud Alternative Airflow

Prefect ต่างจาก Airflow อย่างไร

Python ธรรมดา ไม่ต้อง DAG Dynamic Workflow Runtime Testing ง่าย Prefect Cloud ไม่ต้อง Setup Retry Caching Built-in

สร้าง ETL Pipeline อย่างไร

Extract API Database File Transform Clean Validate Aggregate Load Warehouse @task แต่ละขั้น @flow รวม Schedule Cron Retry Notification

Production Deployment ทำอย่างไร

Prefect Cloud Self-hosted prefect deploy Schedule Cron Work Pool Docker Kubernetes Agent Dashboard Monitoring Notification Slack Email

สรุป

Prefect Workflow Data Pipeline ETL Python Flow Task Retry Caching Schedule Notification Production Docker Kubernetes Monitoring Dashboard Observability

📖 บทความที่เกี่ยวข้อง

Prefect Workflow API Gateway Patternอ่านบทความ → PlanetScale Vitess Data Pipeline ETLอ่านบทความ → React Server Components Data Pipeline ETLอ่านบทความ → Ceph Storage Cluster Data Pipeline ETLอ่านบทความ → Prefect Workflow Hexagonal Architectureอ่านบทความ →

📚 ดูบทความทั้งหมด →