Prefect Workflow กับ GitOps — วิธีใช้ Prefect
Prefect Workflow Orchestration

Prefect เป็น Workflow Orchestration Framework สำหรับ Python ที่ออกแบบมาให้เขียน Data Pipelines ได้ง่ายเหมือนเขียน Python ปกติ ใช้ Decorator @flow และ @task กำหนด Workflows รองรับ Retry, Caching, Scheduling และ Notifications
เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Cimb โทร — คู่มือฉบับสมบูรณ์ 2026
เมื่อรวมกับ GitOps ได้ Workflow Management ที่ Version Controlled ทุกการเปลี่ยนแปลงผ่าน PR มี Review CI/CD Deploy Flows อัตโนมัติ
เนื้อหาเกี่ยวข้อง — Weights Biases Infrastructure as Code
Prefect Flows และ Tasks
# prefect_flows.py — Prefect Data Workflow
# pip install prefect prefect-sqlalchemy prefect-slack
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta, datetime
from typing import List, Dict
import httpx
import json
# === Tasks ===
@task(retries=3, retry_delay_seconds=10,
cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(source_url: str) -> List[Dict]:
"""Extract data จาก API"""
logger = get_run_logger()
logger.info(f"Extracting from {source_url}")
response = httpx.get(source_url, timeout=30)
response.raise_for_status()
data = response.json()
logger.info(f"Extracted {len(data)} records")
return data
@task(retries=2)
def transform_data(raw_data: List[Dict]) -> List[Dict]:
"""Transform และ Clean Data"""
logger = get_run_logger()
transformed = []
for record in raw_data:
cleaned = {
"id": record.get("id"),
"name": record.get("name", "").strip().lower(),
"email": record.get("email", "").strip().lower(),
"created_at": record.get("created_at", ""),
"processed_at": datetime.now().isoformat(),
}
# Validation
if cleaned["id"] and cleaned["email"]:
transformed.append(cleaned)
logger.info(f"Transformed {len(transformed)}/{len(raw_data)} records")
return transformed
@task(retries=2)
def load_data(data: List[Dict], destination: str) -> int:
"""Load Data ไปยัง Destination"""
logger = get_run_logger()
logger.info(f"Loading {len(data)} records to {destination}")
# Simulate loading to database
loaded = 0
for record in data:
# INSERT INTO table VALUES (...)
loaded += 1
logger.info(f"Loaded {loaded} records")
return loaded
@task
def send_notification(flow_name: str, records: int, status: str):
"""ส่ง Notification"""
logger = get_run_logger()
message = f"Flow: {flow_name} | Records: {records} | Status: {status}"
logger.info(f"Notification: {message}")
# ส่ง Slack, Email, etc.
# === Flows ===
@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(source_url: str, destination: str = "warehouse"):
"""Main ETL Pipeline"""
# Extract
raw_data = extract_data(source_url)
# Transform
clean_data = transform_data(raw_data)
# Load
loaded = load_data(clean_data, destination)
# Notify
send_notification("etl-pipeline", loaded, "success")
return {"records_loaded": loaded}
@flow(name="multi-source-etl", log_prints=True)
def multi_source_etl():
"""ETL จากหลาย Sources"""
sources = [
{"url": "https://api.example.com/users", "dest": "users_table"},
{"url": "https://api.example.com/orders", "dest": "orders_table"},
{"url": "https://api.example.com/products", "dest": "products_table"},
]
total = 0
for source in sources:
result = etl_pipeline(source["url"], source["dest"])
total += result.get("records_loaded", 0)
print(f"Total records loaded: {total}")
# === Deployment ===
# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
#
# deployment = Deployment.build_from_flow(
# flow=etl_pipeline,
# name="daily-etl",
# schedule=CronSchedule(cron="0 2 * * *"), # ทุกวัน 02:00
# parameters={"source_url": "https://api.example.com/users"},
# work_queue_name="default",
# )
# deployment.apply()
if __name__ == "__main__":
etl_pipeline("https://jsonplaceholder.typicode.com/users")
GitOps CI/CD Pipeline

# === GitHub Actions — Prefect GitOps Pipeline ===
# .github/workflows/prefect-gitops.yml
name: Prefect GitOps
on:
push:
branches: [main]
paths: ["flows/**", "deployments/**"]
pull_request:
branches: [main]
paths: ["flows/**", "deployments/**"]
env:
PREFECT_API_URL: }
PREFECT_API_KEY: }
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Dependencies
run: |
pip install prefect pytest pytest-asyncio
pip install -r requirements.txt
- name: Lint
run: |
pip install ruff
ruff check flows/
- name: Type Check
run: |
pip install mypy
mypy flows/ --ignore-missing-imports
- name: Unit Tests
run: pytest tests/ -v --tb=short
- name: Flow Validation
run: |
python -c "
from flows.etl_pipeline import etl_pipeline
from flows.multi_source import multi_source_etl
print('All flows validated successfully')
"
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Dependencies
run: |
pip install prefect
pip install -r requirements.txt
- name: Login to Prefect
run: prefect cloud login --key $PREFECT_API_KEY
- name: Deploy Flows
run: |
python deployments/deploy_all.py
echo "All flows deployed"
- name: Verify Deployments
run: |
prefect deployment ls
echo "Deployments verified"
- name: Notify
if: success()
run: |
curl -X POST } \
-H 'Content-Type: application/json' \
-d '{"text":"Prefect flows deployed via GitOps"}'
# === deployments/deploy_all.py ===
# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
# from flows.etl_pipeline import etl_pipeline
# from flows.multi_source import multi_source_etl
#
# # Daily ETL
# Deployment.build_from_flow(
# flow=etl_pipeline,
# name="daily-etl",
# schedule=CronSchedule(cron="0 2 * * *"),
# parameters={"source_url": "https://api.example.com/users"},
# work_queue_name="production",
# ).apply()
#
# # Multi-source ETL
# Deployment.build_from_flow(
# flow=multi_source_etl,
# name="multi-source-daily",
# schedule=CronSchedule(cron="0 3 * * *"),
# work_queue_name="production",
# ).apply()
#
# print("All deployments applied")
Self-hosted Prefect Server
# === Self-hosted Prefect Server ด้วย Docker ===
cat > docker-compose-prefect.yml << 'EOF'
version: '3.8'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: prefect
POSTGRES_USER: prefect
POSTGRES_PASSWORD: SecurePass123
volumes:
- pg-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U prefect"]
interval: 10s
timeout: 5s
retries: 5
prefect-server:
image: prefecthq/prefect:2-latest
command: prefect server start --host 0.0.0.0
ports:
- "4200:4200"
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:SecurePass123@postgres:5432/prefect
PREFECT_SERVER_API_HOST: 0.0.0.0
PREFECT_API_URL: http://localhost:4200/api
depends_on:
postgres:
condition: service_healthy
prefect-worker:
image: prefecthq/prefect:2-latest
command: prefect worker start --pool default-pool --type process
environment:
PREFECT_API_URL: http://prefect-server:4200/api
volumes:
- ./flows:/opt/flows
depends_on:
- prefect-server
volumes:
pg-data:
EOF
docker compose -f docker-compose-prefect.yml up -d
# ตรวจสอบ
# curl http://localhost:4200/api/health
# สร้าง Work Pool
# prefect work-pool create default-pool --type process
# Deploy Flow
# PREFECT_API_URL=http://localhost:4200/api prefect deploy --all
echo "Prefect Server: http://localhost:4200"
echo " Dashboard: http://localhost:4200/dashboard"
Best Practices
- Flows in Git: เก็บ Flow Code ใน Git Repository ทุกการเปลี่ยนแปลงผ่าน PR
- Task Retries: ตั้ง Retries สำหรับ Tasks ที่เรียก External APIs ป้องกัน Transient Failures
- Caching: ใช้ Task Caching สำหรับ Data ที่ไม่เปลี่ยนบ่อย ลด API Calls
- Parameters: ใช้ Flow Parameters แทน Hardcode Values ทำให้ Reusable
- Testing: เขียน Unit Tests สำหรับ Tasks ทดสอบก่อน Deploy
- Monitoring: ใช้ Prefect UI ดู Flow Runs ตั้ง Notifications เมื่อ Flow ล้มเหลว
Prefect คืออะไร
Open-source Workflow Orchestration สำหรับ Python จัดการ Data Pipelines ETL ML Workflows Web UI รองรับ Retry Caching Scheduling Notifications เขียนเป็น Python Code ปกติ @flow @task
แนะนำเพิ่มเติม — ระบบเทรดของ iCafeForex
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ LocalAI Self-hosted SaaS Architecture





