SiamCafe · Blog
Prefect Workflow Data Pipeline ETL — สร้าง Data
บทความ

Prefect Workflow Data Pipeline ETL — สร้าง Data

เผยแพร่ 28 พฤษภาคม 2569

Prefect Data Pipeline

Prefect Workflow Data Pipeline ETL Extract Transform Load Python Flow Task Schedule Retry Caching Notification Observability Prefect Cloud Airflow Alternative

OrchestratorLanguageDynamic DAGCloudเหมาะกับ
PrefectPythonใช่Prefect CloudModern ETL
AirflowPythonจำกัดMWAA AstronomerEnterprise
DagsterPythonใช่Dagster CloudData Assets
LuigiPythonไม่ไม่มีSimple Pipeline
dbtSQLไม่dbt CloudSQL Transform

Prefect Flow และ Task

# === Prefect ETL Pipeline ===

# pip install prefect httpx pandas sqlalchemy

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import json

# Extract — ดึงข้อมูลจาก API
@task(retries=3, retry_delay_seconds=60,
      cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def extract_api_data(url: str) -> list:
    """Extract data from REST API"""
    import httpx
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    data = response.json()
    print(f"Extracted {len(data)} records from {url}")
    return data

# Transform — แปลงข้อมูล
@task
def transform_data(raw_data: list) -> pd.DataFrame:
    """Clean and transform raw data"""
    df = pd.DataFrame(raw_data)
    # Clean
    df = df.dropna(subset=['id', 'name'])
    df['name'] = df['name'].str.strip()
    df['created_at'] = pd.to_datetime(df.get('created_at', pd.Timestamp.now()))
    # Aggregate
    if 'amount' in df.columns:
        df['amount'] = pd.to_numeric(df['amount'], errors='coerce').fillna(0)
    print(f"Transformed: {len(df)} records, {len(df.columns)} columns")
    return df

# Load — โหลดเข้า Database
@task(retries=2, retry_delay_seconds=30)
def load_to_database(df: pd.DataFrame, table_name: str) -> int:
    """Load DataFrame to database"""
    # from sqlalchemy import create_engine
    # engine = create_engine("postgresql://user:pass@localhost/warehouse")
    # df.to_sql(table_name, engine, if_exists='append', index=False)
    rows = len(df)
    print(f"Loaded {rows} records to {table_name}")
    return rows

# Main ETL Flow
@flow(name="daily-etl-pipeline",
      description="Daily ETL from API to Warehouse",
      log_prints=True)
def etl_pipeline(api_url: str, table_name: str):
    """Main ETL Pipeline"""
    # Extract
    raw_data = extract_api_data(api_url)
    # Transform
    clean_df = transform_data(raw_data)
    # Load
    rows_loaded = load_to_database(clean_df, table_name)
    return {"rows_loaded": rows_loaded, "status": "success"}

# Run locally
# etl_pipeline(
#     api_url="https://api.example.com/data",
#     table_name="daily_data"
# )

print("=== Prefect ETL Pipeline ===")
print("  Flow: daily-etl-pipeline")
print("  Tasks: extract_api_data → transform_data → load_to_database")
print("  Features: Retry, Caching, Logging, Error Handling")

Advanced Features

# === Advanced Prefect Features ===

# Concurrent Tasks
# from prefect import flow, task
# from prefect.futures import wait
#
# @task
# def extract_source(source: str) -> list:
#     return fetch_data(source)
#
# @flow
# def multi_source_etl():
#     sources = ["api_1", "api_2", "database_1", "s3_bucket"]
#     futures = [extract_source.submit(s) for s in sources]
#     results = [f.result() for f in futures]
#     combined = combine_data(results)
#     load_to_warehouse(combined)

# Sub-flows
# @flow
# def extract_flow():
#     api_data = extract_api_data("https://api.example.com/users")
#     db_data = extract_database("SELECT * FROM orders")
#     return merge_data(api_data, db_data)
#
# @flow
# def transform_flow(raw_data):
#     cleaned = clean_data(raw_data)
#     validated = validate_data(cleaned)
#     enriched = enrich_data(validated)
#     return enriched
#
# @flow(name="main-pipeline")
# def main_pipeline():
#     raw = extract_flow()
#     transformed = transform_flow(raw)
#     load_to_warehouse(transformed)

# Deployment — prefect.yaml
# deployments:
#   - name: daily-etl
#     entrypoint: flows/etl.py:etl_pipeline
#     schedule:
#       cron: "0 2 * * *"  # Daily 2AM
#       timezone: "Asia/Bangkok"
#     parameters:
#       api_url: "https://api.example.com/data"
#       table_name: "daily_data"
#     work_pool:
#       name: docker-pool
#     tags: ["etl", "daily", "production"]

# Notification
# from prefect.blocks.notifications import SlackWebhook
# slack = SlackWebhook(url="https://hooks.slack.com/...")
# slack.notify("ETL Pipeline completed: 10,000 rows loaded")

from dataclasses import dataclass

@dataclass
class PipelineMetric:
    pipeline: str
    schedule: str
    avg_duration: str
    success_rate: float
    rows_per_run: str
    last_run: str

pipelines = [
    PipelineMetric("Daily User ETL", "0 2 * * *", "5 min", 99.2, "50,000", "2025-01-20 02:00"),
    PipelineMetric("Hourly Events", "0 * * * *", "2 min", 98.5, "200,000", "2025-01-20 14:00"),
    PipelineMetric("Weekly Reports", "0 6 * * MON", "15 min", 100.0, "500,000", "2025-01-20 06:00"),
    PipelineMetric("Real-time Sync", "*/5 * * * *", "30 sec", 97.8, "5,000", "2025-01-20 14:55"),
]

print("\n=== Pipeline Dashboard ===")
for p in pipelines:
    print(f"  [{p.pipeline}] Schedule: {p.schedule}")
    print(f"    Duration: {p.avg_duration} | Success: {p.success_rate}%")
    print(f"    Rows: {p.rows_per_run} | Last: {p.last_run}")

Production Operations

# === Production Deployment ===

# Docker Work Pool
# prefect work-pool create docker-pool --type docker
# prefect worker start --pool docker-pool

# Kubernetes Work Pool
# prefect work-pool create k8s-pool --type kubernetes
# prefect worker start --pool k8s-pool

# Deploy
# prefect deploy --all
# prefect deployment run daily-etl-pipeline/daily-etl

# Monitoring
# prefect server start  # Self-hosted UI at localhost:4200

@dataclass
class InfraConfig:
    component: str
    type_val: str
    config: str
    scaling: str

infra = [
    InfraConfig("Prefect Server", "Self-hosted / Cloud", "2 CPU 4GB RAM", "Single instance"),
    InfraConfig("Worker (Docker)", "Docker Work Pool", "4 CPU 8GB RAM", "Auto-scale 1-5"),
    InfraConfig("Worker (K8s)", "Kubernetes Work Pool", "Job per Flow Run", "HPA Auto"),
    InfraConfig("Database", "PostgreSQL", "16GB RAM SSD", "Primary + Replica"),
    InfraConfig("Object Storage", "S3 / MinIO", "Unlimited", "Auto"),
]

print("Production Infrastructure:")
for i in infra:
    print(f"  [{i.component}] {i.type_val}")
    print(f"    Config: {i.config} | Scaling: {i.scaling}")

best_practices = [
    "Retry: ตั้ง retries=3 สำหรับ External API calls",
    "Caching: Cache expensive computations ลด Runtime",
    "Idempotent: ทำให้ Task Idempotent รันซ้ำได้ปลอดภัย",
    "Small Tasks: แบ่ง Task เล็กๆ ง่ายต่อ Debug และ Retry",
    "Notification: ตั้ง Slack/Email Alert เมื่อ Flow ล้มเหลว",
    "Monitoring: ดู Dashboard ทุกวัน หา Slow/Failed Flows",
    "Testing: Test Flow เหมือน Python Function ปกติ",
    "Version: ใช้ Git Version Control สำหรับ Flow Code",
]

print(f"\n\nBest Practices:")
for i, b in enumerate(best_practices, 1):
    print(f"  {i}. {b}")

เคล็ดลับ

  • Retry: ตั้ง Retry สำหรับ External API ทุกครั้ง
  • Cache: Cache ข้อมูลที่ไม่เปลี่ยนบ่อย ประหยัดเวลา
  • Idempotent: ทำ Task ให้รันซ้ำได้โดยไม่มีผลข้างเคียง
  • Monitor: ดู Prefect Dashboard ทุกวัน ตั้ง Alert
  • Test: Test Flow ด้วย pytest เหมือน Python ปกติ

Prefect คืออะไร

Workflow Orchestration Python @flow @task Dashboard Retry Error Handling Scheduling Caching Notification Prefect Cloud Alternative Airflow