Prefect Workflow Orchestration
Prefect เป็น Workflow Orchestration Framework สำหรับ Python ที่ออกแบบมาให้เขียน Data Pipelines ได้ง่ายเหมือนเขียน Python ปกติ ใช้ Decorator @flow และ @task กำหนด Workflows รองรับ Retry, Caching, Scheduling และ Notifications
เมื่อรวมกับ GitOps ได้ Workflow Management ที่ Version Controlled ทุกการเปลี่ยนแปลงผ่าน PR มี Review CI/CD Deploy Flows อัตโนมัติ
Prefect Flows และ Tasks
# prefect_flows.py — Prefect Data Workflow
# pip install prefect prefect-sqlalchemy prefect-slack
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta, datetime
from typing import List, Dict
import httpx
import json
# === Tasks ===
@task(retries=3, retry_delay_seconds=10,
cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(source_url: str) -> List[Dict]:
"""Extract data จาก API"""
logger = get_run_logger()
logger.info(f"Extracting from {source_url}")
response = httpx.get(source_url, timeout=30)
response.raise_for_status()
data = response.json()
logger.info(f"Extracted {len(data)} records")
return data
@task(retries=2)
def transform_data(raw_data: List[Dict]) -> List[Dict]:
"""Transform และ Clean Data"""
logger = get_run_logger()
transformed = []
for record in raw_data:
cleaned = {
"id": record.get("id"),
"name": record.get("name", "").strip().lower(),
"email": record.get("email", "").strip().lower(),
"created_at": record.get("created_at", ""),
"processed_at": datetime.now().isoformat(),
}
# Validation
if cleaned["id"] and cleaned["email"]:
transformed.append(cleaned)
logger.info(f"Transformed {len(transformed)}/{len(raw_data)} records")
return transformed
@task(retries=2)
def load_data(data: List[Dict], destination: str) -> int:
"""Load Data ไปยัง Destination"""
logger = get_run_logger()
logger.info(f"Loading {len(data)} records to {destination}")
# Simulate loading to database
loaded = 0
for record in data:
# INSERT INTO table VALUES (...)
loaded += 1
logger.info(f"Loaded {loaded} records")
return loaded
@task
def send_notification(flow_name: str, records: int, status: str):
"""ส่ง Notification"""
logger = get_run_logger()
message = f"Flow: {flow_name} | Records: {records} | Status: {status}"
logger.info(f"Notification: {message}")
# ส่ง Slack, Email, etc.
# === Flows ===
@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(source_url: str, destination: str = "warehouse"):
"""Main ETL Pipeline"""
# Extract
raw_data = extract_data(source_url)
# Transform
clean_data = transform_data(raw_data)
# Load
loaded = load_data(clean_data, destination)
# Notify
send_notification("etl-pipeline", loaded, "success")
return {"records_loaded": loaded}
@flow(name="multi-source-etl", log_prints=True)
def multi_source_etl():
"""ETL จากหลาย Sources"""
sources = [
{"url": "https://api.example.com/users", "dest": "users_table"},
{"url": "https://api.example.com/orders", "dest": "orders_table"},
{"url": "https://api.example.com/products", "dest": "products_table"},
]
total = 0
for source in sources:
result = etl_pipeline(source["url"], source["dest"])
total += result.get("records_loaded", 0)
print(f"Total records loaded: {total}")
# === Deployment ===
# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
#
# deployment = Deployment.build_from_flow(
# flow=etl_pipeline,
# name="daily-etl",
# schedule=CronSchedule(cron="0 2 * * *"), # ทุกวัน 02:00
# parameters={"source_url": "https://api.example.com/users"},
# work_queue_name="default",
# )
# deployment.apply()
if __name__ == "__main__":
etl_pipeline("https://jsonplaceholder.typicode.com/users")
GitOps CI/CD Pipeline
# === GitHub Actions — Prefect GitOps Pipeline ===
# .github/workflows/prefect-gitops.yml
name: Prefect GitOps
on:
push:
branches: [main]
paths: ["flows/**", "deployments/**"]
pull_request:
branches: [main]
paths: ["flows/**", "deployments/**"]
env:
PREFECT_API_URL: }
PREFECT_API_KEY: }
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Dependencies
run: |
pip install prefect pytest pytest-asyncio
pip install -r requirements.txt
- name: Lint
run: |
pip install ruff
ruff check flows/
- name: Type Check
run: |
pip install mypy
mypy flows/ --ignore-missing-imports
- name: Unit Tests
run: pytest tests/ -v --tb=short
- name: Flow Validation
run: |
python -c "
from flows.etl_pipeline import etl_pipeline
from flows.multi_source import multi_source_etl
print('All flows validated successfully')
"
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Dependencies
run: |
pip install prefect
pip install -r requirements.txt
- name: Login to Prefect
run: prefect cloud login --key $PREFECT_API_KEY
- name: Deploy Flows
run: |
python deployments/deploy_all.py
echo "All flows deployed"
- name: Verify Deployments
run: |
prefect deployment ls
echo "Deployments verified"
- name: Notify
if: success()
run: |
curl -X POST } \
-H 'Content-Type: application/json' \
-d '{"text":"Prefect flows deployed via GitOps"}'
# === deployments/deploy_all.py ===
# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
# from flows.etl_pipeline import etl_pipeline
# from flows.multi_source import multi_source_etl
#
# # Daily ETL
# Deployment.build_from_flow(
# flow=etl_pipeline,
# name="daily-etl",
# schedule=CronSchedule(cron="0 2 * * *"),
# parameters={"source_url": "https://api.example.com/users"},
# work_queue_name="production",
# ).apply()
#
# # Multi-source ETL
# Deployment.build_from_flow(
# flow=multi_source_etl,
# name="multi-source-daily",
# schedule=CronSchedule(cron="0 3 * * *"),
# work_queue_name="production",
# ).apply()
#
# print("All deployments applied")
Self-hosted Prefect Server
# === Self-hosted Prefect Server ด้วย Docker ===
cat > docker-compose-prefect.yml << 'EOF'
version: '3.8'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: prefect
POSTGRES_USER: prefect
POSTGRES_PASSWORD: SecurePass123
volumes:
- pg-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U prefect"]
interval: 10s
timeout: 5s
retries: 5
prefect-server:
image: prefecthq/prefect:2-latest
command: prefect server start --host 0.0.0.0
ports:
- "4200:4200"
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:SecurePass123@postgres:5432/prefect
PREFECT_SERVER_API_HOST: 0.0.0.0
PREFECT_API_URL: http://localhost:4200/api
depends_on:
postgres:
condition: service_healthy
prefect-worker:
image: prefecthq/prefect:2-latest
command: prefect worker start --pool default-pool --type process
environment:
PREFECT_API_URL: http://prefect-server:4200/api
volumes:
- ./flows:/opt/flows
depends_on:
- prefect-server
volumes:
pg-data:
EOF
docker compose -f docker-compose-prefect.yml up -d
# ตรวจสอบ
# curl http://localhost:4200/api/health
# สร้าง Work Pool
# prefect work-pool create default-pool --type process
# Deploy Flow
# PREFECT_API_URL=http://localhost:4200/api prefect deploy --all
echo "Prefect Server: http://localhost:4200"
echo " Dashboard: http://localhost:4200/dashboard"
Best Practices
- Flows in Git: เก็บ Flow Code ใน Git Repository ทุกการเปลี่ยนแปลงผ่าน PR
- Task Retries: ตั้ง Retries สำหรับ Tasks ที่เรียก External APIs ป้องกัน Transient Failures
- Caching: ใช้ Task Caching สำหรับ Data ที่ไม่เปลี่ยนบ่อย ลด API Calls
- Parameters: ใช้ Flow Parameters แทน Hardcode Values ทำให้ Reusable
- Testing: เขียน Unit Tests สำหรับ Tasks ทดสอบก่อน Deploy
- Monitoring: ใช้ Prefect UI ดู Flow Runs ตั้ง Notifications เมื่อ Flow ล้มเหลว
Prefect คืออะไร
Open-source Workflow Orchestration สำหรับ Python จัดการ Data Pipelines ETL ML Workflows Web UI รองรับ Retry Caching Scheduling Notifications เขียนเป็น Python Code ปกติ @flow @task
Prefect ต่างจาก Airflow อย่างไร
Prefect เขียน Python ปกติไม่ต้อง DAG Objects Deploy ง่าย Dynamic Workflows UI สวย มี Cloud Airflow เหมาะ Enterprise Complex Scheduling Community ใหญ่กว่า
GitOps กับ Prefect ใช้อย่างไร
เก็บ Flows ใน Git ทุกการเปลี่ยนแปลงผ่าน PR CI/CD ทดสอบ Flows อัตโนมัติ Deploy เมื่อ Merge เข้า Main Version Control สำหรับทุก Workflow
Prefect Cloud กับ Self-hosted ต่างกันอย่างไร
Cloud Managed Service ไม่ต้อง Manage Server Dashboard RBAC Audit Log ฟรี 3 Users Self-hosted ติดตั้งเอง ข้อมูลอยู่ Infrastructure เอง ไม่มีค่า License เหมาะ Data Privacy
สรุป
Prefect เป็น Workflow Orchestration ที่เขียนง่ายเหมือน Python ปกติ เมื่อรวมกับ GitOps ได้ Workflow Management ที่ Version Controlled CI/CD Deploy อัตโนมัติ ใช้ Task Retries Caching สำหรับ Reliability Self-hosted ด้วย Docker สำหรับ Data Privacy ตั้ง Notifications เมื่อ Flow ล้มเหลว
