Prefect Workflow Data Pipeline ETL — สร้าง Data
Prefect Data Pipeline
Prefect Workflow Data Pipeline ETL Extract Transform Load Python Flow Task Schedule Retry Caching Notification Observability Prefect Cloud Airflow Alternative
| Orchestrator | Language | Dynamic DAG | Cloud | เหมาะกับ |
|---|---|---|---|---|
| Prefect | Python | ใช่ | Prefect Cloud | Modern ETL |
| Airflow | Python | จำกัด | MWAA Astronomer | Enterprise |
| Dagster | Python | ใช่ | Dagster Cloud | Data Assets |
| Luigi | Python | ไม่ | ไม่มี | Simple Pipeline |
| dbt | SQL | ไม่ | dbt Cloud | SQL Transform |
Prefect Flow และ Task
# === Prefect ETL Pipeline ===
# pip install prefect httpx pandas sqlalchemy
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import json
# Extract — ดึงข้อมูลจาก API
@task(retries=3, retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def extract_api_data(url: str) -> list:
"""Extract data from REST API"""
import httpx
response = httpx.get(url, timeout=30)
response.raise_for_status()
data = response.json()
print(f"Extracted {len(data)} records from {url}")
return data
# Transform — แปลงข้อมูล
@task
def transform_data(raw_data: list) -> pd.DataFrame:
"""Clean and transform raw data"""
df = pd.DataFrame(raw_data)
# Clean
df = df.dropna(subset=['id', 'name'])
df['name'] = df['name'].str.strip()
df['created_at'] = pd.to_datetime(df.get('created_at', pd.Timestamp.now()))
# Aggregate
if 'amount' in df.columns:
df['amount'] = pd.to_numeric(df['amount'], errors='coerce').fillna(0)
print(f"Transformed: {len(df)} records, {len(df.columns)} columns")
return df
# Load — โหลดเข้า Database
@task(retries=2, retry_delay_seconds=30)
def load_to_database(df: pd.DataFrame, table_name: str) -> int:
"""Load DataFrame to database"""
# from sqlalchemy import create_engine
# engine = create_engine("postgresql://user:pass@localhost/warehouse")
# df.to_sql(table_name, engine, if_exists='append', index=False)
rows = len(df)
print(f"Loaded {rows} records to {table_name}")
return rows
# Main ETL Flow
@flow(name="daily-etl-pipeline",
description="Daily ETL from API to Warehouse",
log_prints=True)
def etl_pipeline(api_url: str, table_name: str):
"""Main ETL Pipeline"""
# Extract
raw_data = extract_api_data(api_url)
# Transform
clean_df = transform_data(raw_data)
# Load
rows_loaded = load_to_database(clean_df, table_name)
return {"rows_loaded": rows_loaded, "status": "success"}
# Run locally
# etl_pipeline(
# api_url="https://api.example.com/data",
# table_name="daily_data"
# )
print("=== Prefect ETL Pipeline ===")
print(" Flow: daily-etl-pipeline")
print(" Tasks: extract_api_data → transform_data → load_to_database")
print(" Features: Retry, Caching, Logging, Error Handling")
Advanced Features
# === Advanced Prefect Features ===
# Concurrent Tasks
# from prefect import flow, task
# from prefect.futures import wait
#
# @task
# def extract_source(source: str) -> list:
# return fetch_data(source)
#
# @flow
# def multi_source_etl():
# sources = ["api_1", "api_2", "database_1", "s3_bucket"]
# futures = [extract_source.submit(s) for s in sources]
# results = [f.result() for f in futures]
# combined = combine_data(results)
# load_to_warehouse(combined)
# Sub-flows
# @flow
# def extract_flow():
# api_data = extract_api_data("https://api.example.com/users")
# db_data = extract_database("SELECT * FROM orders")
# return merge_data(api_data, db_data)
#
# @flow
# def transform_flow(raw_data):
# cleaned = clean_data(raw_data)
# validated = validate_data(cleaned)
# enriched = enrich_data(validated)
# return enriched
#
# @flow(name="main-pipeline")
# def main_pipeline():
# raw = extract_flow()
# transformed = transform_flow(raw)
# load_to_warehouse(transformed)
# Deployment — prefect.yaml
# deployments:
# - name: daily-etl
# entrypoint: flows/etl.py:etl_pipeline
# schedule:
# cron: "0 2 * * *" # Daily 2AM
# timezone: "Asia/Bangkok"
# parameters:
# api_url: "https://api.example.com/data"
# table_name: "daily_data"
# work_pool:
# name: docker-pool
# tags: ["etl", "daily", "production"]
# Notification
# from prefect.blocks.notifications import SlackWebhook
# slack = SlackWebhook(url="https://hooks.slack.com/...")
# slack.notify("ETL Pipeline completed: 10,000 rows loaded")
from dataclasses import dataclass
@dataclass
class PipelineMetric:
pipeline: str
schedule: str
avg_duration: str
success_rate: float
rows_per_run: str
last_run: str
pipelines = [
PipelineMetric("Daily User ETL", "0 2 * * *", "5 min", 99.2, "50,000", "2025-01-20 02:00"),
PipelineMetric("Hourly Events", "0 * * * *", "2 min", 98.5, "200,000", "2025-01-20 14:00"),
PipelineMetric("Weekly Reports", "0 6 * * MON", "15 min", 100.0, "500,000", "2025-01-20 06:00"),
PipelineMetric("Real-time Sync", "*/5 * * * *", "30 sec", 97.8, "5,000", "2025-01-20 14:55"),
]
print("\n=== Pipeline Dashboard ===")
for p in pipelines:
print(f" [{p.pipeline}] Schedule: {p.schedule}")
print(f" Duration: {p.avg_duration} | Success: {p.success_rate}%")
print(f" Rows: {p.rows_per_run} | Last: {p.last_run}")
Production Operations
# === Production Deployment ===
# Docker Work Pool
# prefect work-pool create docker-pool --type docker
# prefect worker start --pool docker-pool
# Kubernetes Work Pool
# prefect work-pool create k8s-pool --type kubernetes
# prefect worker start --pool k8s-pool
# Deploy
# prefect deploy --all
# prefect deployment run daily-etl-pipeline/daily-etl
# Monitoring
# prefect server start # Self-hosted UI at localhost:4200
@dataclass
class InfraConfig:
component: str
type_val: str
config: str
scaling: str
infra = [
InfraConfig("Prefect Server", "Self-hosted / Cloud", "2 CPU 4GB RAM", "Single instance"),
InfraConfig("Worker (Docker)", "Docker Work Pool", "4 CPU 8GB RAM", "Auto-scale 1-5"),
InfraConfig("Worker (K8s)", "Kubernetes Work Pool", "Job per Flow Run", "HPA Auto"),
InfraConfig("Database", "PostgreSQL", "16GB RAM SSD", "Primary + Replica"),
InfraConfig("Object Storage", "S3 / MinIO", "Unlimited", "Auto"),
]
print("Production Infrastructure:")
for i in infra:
print(f" [{i.component}] {i.type_val}")
print(f" Config: {i.config} | Scaling: {i.scaling}")
best_practices = [
"Retry: ตั้ง retries=3 สำหรับ External API calls",
"Caching: Cache expensive computations ลด Runtime",
"Idempotent: ทำให้ Task Idempotent รันซ้ำได้ปลอดภัย",
"Small Tasks: แบ่ง Task เล็กๆ ง่ายต่อ Debug และ Retry",
"Notification: ตั้ง Slack/Email Alert เมื่อ Flow ล้มเหลว",
"Monitoring: ดู Dashboard ทุกวัน หา Slow/Failed Flows",
"Testing: Test Flow เหมือน Python Function ปกติ",
"Version: ใช้ Git Version Control สำหรับ Flow Code",
]
print(f"\n\nBest Practices:")
for i, b in enumerate(best_practices, 1):
print(f" {i}. {b}")
เคล็ดลับ
- Retry: ตั้ง Retry สำหรับ External API ทุกครั้ง
- Cache: Cache ข้อมูลที่ไม่เปลี่ยนบ่อย ประหยัดเวลา
- Idempotent: ทำ Task ให้รันซ้ำได้โดยไม่มีผลข้างเคียง
- Monitor: ดู Prefect Dashboard ทุกวัน ตั้ง Alert
- Test: Test Flow ด้วย pytest เหมือน Python ปกติ
Prefect คืออะไร
Workflow Orchestration Python @flow @task Dashboard Retry Error Handling Scheduling Caching Notification Prefect Cloud Alternative Airflow