ai

Prefect Workflow Data Pipeline ETL — สร้าง Data

Prefect Workflow Data Pipeline ETL — สร้าง Data

Prefect Data Pipeline

Prefect Workflow Data Pipeline ETL — สร้าง Data

Prefect Workflow Data Pipeline ETL Extract Transform Load Python Flow Task Schedule Retry Caching Notification Observability Prefect Cloud Airflow Alternative

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Airbyte ETL Troubleshooting แก้ปัญหา

OrchestratorLanguageDynamic DAGCloudเหมาะกับ
PrefectPythonใช่Prefect CloudModern ETL
AirflowPythonจำกัดMWAA AstronomerEnterprise
DagsterPythonใช่Dagster CloudData Assets
LuigiPythonไม่ไม่มีSimple Pipeline
dbtSQLไม่dbt CloudSQL Transform

Prefect Flow และ Task

# === Prefect ETL Pipeline ===

# pip install prefect httpx pandas sqlalchemy

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import json

# Extract — ดึงข้อมูลจาก API
@task(retries=3, retry_delay_seconds=60,
      cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def extract_api_data(url: str) -> list:
    """Extract data from REST API"""
    import httpx
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    data = response.json()
    print(f"Extracted {len(data)} records from {url}")
    return data

# Transform — แปลงข้อมูล
@task
def transform_data(raw_data: list) -> pd.DataFrame:
    """Clean and transform raw data"""
    df = pd.DataFrame(raw_data)
    # Clean
    df = df.dropna(subset=['id', 'name'])
    df['name'] = df['name'].str.strip()
    df['created_at'] = pd.to_datetime(df.get('created_at', pd.Timestamp.now()))
    # Aggregate
    if 'amount' in df.columns:
        df['amount'] = pd.to_numeric(df['amount'], errors='coerce').fillna(0)
    print(f"Transformed: {len(df)} records, {len(df.columns)} columns")
    return df

# Load — โหลดเข้า Database
@task(retries=2, retry_delay_seconds=30)
def load_to_database(df: pd.DataFrame, table_name: str) -> int:
    """Load DataFrame to database"""
    # from sqlalchemy import create_engine
    # engine = create_engine("postgresql://user:pass@localhost/warehouse")
    # df.to_sql(table_name, engine, if_exists='append', index=False)
    rows = len(df)
    print(f"Loaded {rows} records to {table_name}")
    return rows

# Main ETL Flow
@flow(name="daily-etl-pipeline",
      description="Daily ETL from API to Warehouse",
      log_prints=True)
def etl_pipeline(api_url: str, table_name: str):
    """Main ETL Pipeline"""
    # Extract
    raw_data = extract_api_data(api_url)
    # Transform
    clean_df = transform_data(raw_data)
    # Load
    rows_loaded = load_to_database(clean_df, table_name)
    return {"rows_loaded": rows_loaded, "status": "success"}

# Run locally
# etl_pipeline(
#     api_url="https://api.example.com/data",
#     table_name="daily_data"
# )

print("=== Prefect ETL Pipeline ===")
print("  Flow: daily-etl-pipeline")
print("  Tasks: extract_api_data → transform_data → load_to_database")
print("  Features: Retry, Caching, Logging, Error Handling")

Advanced Features

Prefect Workflow Data Pipeline ETL — สร้าง Data
# === Advanced Prefect Features ===

# Concurrent Tasks
# from prefect import flow, task
# from prefect.futures import wait
#
# @task
# def extract_source(source: str) -> list:
#     return fetch_data(source)
#
# @flow
# def multi_source_etl():
#     sources = ["api_1", "api_2", "database_1", "s3_bucket"]
#     futures = [extract_source.submit(s) for s in sources]
#     results = [f.result() for f in futures]
#     combined = combine_data(results)
#     load_to_warehouse(combined)

# Sub-flows
# @flow
# def extract_flow():
#     api_data = extract_api_data("https://api.example.com/users")
#     db_data = extract_database("SELECT * FROM orders")
#     return merge_data(api_data, db_data)
#
# @flow
# def transform_flow(raw_data):
#     cleaned = clean_data(raw_data)
#     validated = validate_data(cleaned)
#     enriched = enrich_data(validated)
#     return enriched
#
# @flow(name="main-pipeline")
# def main_pipeline():
#     raw = extract_flow()
#     transformed = transform_flow(raw)
#     load_to_warehouse(transformed)

# Deployment — prefect.yaml
# deployments:
#   - name: daily-etl
#     entrypoint: flows/etl.py:etl_pipeline
#     schedule:
#       cron: "0 2 * * *"  # Daily 2AM
#       timezone: "Asia/Bangkok"
#     parameters:
#       api_url: "https://api.example.com/data"
#       table_name: "daily_data"
#     work_pool:
#       name: docker-pool
#     tags: ["etl", "daily", "production"]

# Notification
# from prefect.blocks.notifications import SlackWebhook
# slack = SlackWebhook(url="https://hooks.slack.com/...")
# slack.notify("ETL Pipeline completed: 10,000 rows loaded")

from dataclasses import dataclass

@dataclass
class PipelineMetric:
    pipeline: str
    schedule: str
    avg_duration: str
    success_rate: float
    rows_per_run: str
    last_run: str

pipelines = [
    PipelineMetric("Daily User ETL", "0 2 * * *", "5 min", 99.2, "50,000", "2025-01-20 02:00"),
    PipelineMetric("Hourly Events", "0 * * * *", "2 min", 98.5, "200,000", "2025-01-20 14:00"),
    PipelineMetric("Weekly Reports", "0 6 * * MON", "15 min", 100.0, "500,000", "2025-01-20 06:00"),
    PipelineMetric("Real-time Sync", "*/5 * * * *", "30 sec", 97.8, "5,000", "2025-01-20 14:55"),
]

print("\n=== Pipeline Dashboard ===")
for p in pipelines:
    print(f"  [{p.pipeline}] Schedule: {p.schedule}")
    print(f"    Duration: {p.avg_duration} | Success: {p.success_rate}%")
    print(f"    Rows: {p.rows_per_run} | Last: {p.last_run}")

Production Operations

# === Production Deployment ===

# Docker Work Pool
# prefect work-pool create docker-pool --type docker
# prefect worker start --pool docker-pool

# Kubernetes Work Pool
# prefect work-pool create k8s-pool --type kubernetes
# prefect worker start --pool k8s-pool

# Deploy
# prefect deploy --all
# prefect deployment run daily-etl-pipeline/daily-etl

# Monitoring
# prefect server start  # Self-hosted UI at localhost:4200

@dataclass
class InfraConfig:
    component: str
    type_val: str
    config: str
    scaling: str

infra = [
    InfraConfig("Prefect Server", "Self-hosted / Cloud", "2 CPU 4GB RAM", "Single instance"),
    InfraConfig("Worker (Docker)", "Docker Work Pool", "4 CPU 8GB RAM", "Auto-scale 1-5"),
    InfraConfig("Worker (K8s)", "Kubernetes Work Pool", "Job per Flow Run", "HPA Auto"),
    InfraConfig("Database", "PostgreSQL", "16GB RAM SSD", "Primary + Replica"),
    InfraConfig("Object Storage", "S3 / MinIO", "Unlimited", "Auto"),
]

print("Production Infrastructure:")
for i in infra:
    print(f"  [{i.component}] {i.type_val}")
    print(f"    Config: {i.config} | Scaling: {i.scaling}")

best_practices = [
    "Retry: ตั้ง retries=3 สำหรับ External API calls",
    "Caching: Cache expensive computations ลด Runtime",
    "Idempotent: ทำให้ Task Idempotent รันซ้ำได้ปลอดภัย",
    "Small Tasks: แบ่ง Task เล็กๆ ง่ายต่อ Debug และ Retry",
    "Notification: ตั้ง Slack/Email Alert เมื่อ Flow ล้มเหลว",
    "Monitoring: ดู Dashboard ทุกวัน หา Slow/Failed Flows",
    "Testing: Test Flow เหมือน Python Function ปกติ",
    "Version: ใช้ Git Version Control สำหรับ Flow Code",
]

print(f"\n\nBest Practices:")
for i, b in enumerate(best_practices, 1):
    print(f"  {i}. {b}")

เคล็ดลับ

  • Retry: ตั้ง Retry สำหรับ External API ทุกครั้ง
  • Cache: Cache ข้อมูลที่ไม่เปลี่ยนบ่อย ประหยัดเวลา
  • Idempotent: ทำ Task ให้รันซ้ำได้โดยไม่มีผลข้างเคียง
  • Monitor: ดู Prefect Dashboard ทุกวัน ตั้ง Alert
  • Test: Test Flow ด้วย pytest เหมือน Python ปกติ

Prefect คืออะไร

Workflow Orchestration Python @flow @task Dashboard Retry Error Handling Scheduling Caching Notification Prefect Cloud Alternative Airflow

แนะนำเพิ่มเติม — หนังสือเทรดที่ SiamCafeBook

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Postman Newman Shift Left Security —

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ CSS Container Queries SSL TLS Certificate

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง