SiamCafe.net Blog
Technology

Prefect Workflow GitOps Workflow

prefect workflow gitops workflow
Prefect Workflow GitOps Workflow | SiamCafe Blog
2026-05-01· อ. บอม — SiamCafe.net· 10,701 คำ

Prefect Workflow Orchestration

Prefect เป็น Workflow Orchestration Framework สำหรับ Python ที่ออกแบบมาให้เขียน Data Pipelines ได้ง่ายเหมือนเขียน Python ปกติ ใช้ Decorator @flow และ @task กำหนด Workflows รองรับ Retry, Caching, Scheduling และ Notifications

เมื่อรวมกับ GitOps ได้ Workflow Management ที่ Version Controlled ทุกการเปลี่ยนแปลงผ่าน PR มี Review CI/CD Deploy Flows อัตโนมัติ

Prefect Flows และ Tasks

# prefect_flows.py — Prefect Data Workflow
# pip install prefect prefect-sqlalchemy prefect-slack

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta, datetime
from typing import List, Dict
import httpx
import json

# === Tasks ===

@task(retries=3, retry_delay_seconds=10,
      cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(source_url: str) -> List[Dict]:
    """Extract data จาก API"""
    logger = get_run_logger()
    logger.info(f"Extracting from {source_url}")

    response = httpx.get(source_url, timeout=30)
    response.raise_for_status()
    data = response.json()

    logger.info(f"Extracted {len(data)} records")
    return data

@task(retries=2)
def transform_data(raw_data: List[Dict]) -> List[Dict]:
    """Transform และ Clean Data"""
    logger = get_run_logger()

    transformed = []
    for record in raw_data:
        cleaned = {
            "id": record.get("id"),
            "name": record.get("name", "").strip().lower(),
            "email": record.get("email", "").strip().lower(),
            "created_at": record.get("created_at", ""),
            "processed_at": datetime.now().isoformat(),
        }

        # Validation
        if cleaned["id"] and cleaned["email"]:
            transformed.append(cleaned)

    logger.info(f"Transformed {len(transformed)}/{len(raw_data)} records")
    return transformed

@task(retries=2)
def load_data(data: List[Dict], destination: str) -> int:
    """Load Data ไปยัง Destination"""
    logger = get_run_logger()
    logger.info(f"Loading {len(data)} records to {destination}")

    # Simulate loading to database
    loaded = 0
    for record in data:
        # INSERT INTO table VALUES (...)
        loaded += 1

    logger.info(f"Loaded {loaded} records")
    return loaded

@task
def send_notification(flow_name: str, records: int, status: str):
    """ส่ง Notification"""
    logger = get_run_logger()
    message = f"Flow: {flow_name} | Records: {records} | Status: {status}"
    logger.info(f"Notification: {message}")
    # ส่ง Slack, Email, etc.

# === Flows ===

@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(source_url: str, destination: str = "warehouse"):
    """Main ETL Pipeline"""
    # Extract
    raw_data = extract_data(source_url)

    # Transform
    clean_data = transform_data(raw_data)

    # Load
    loaded = load_data(clean_data, destination)

    # Notify
    send_notification("etl-pipeline", loaded, "success")

    return {"records_loaded": loaded}

@flow(name="multi-source-etl", log_prints=True)
def multi_source_etl():
    """ETL จากหลาย Sources"""
    sources = [
        {"url": "https://api.example.com/users", "dest": "users_table"},
        {"url": "https://api.example.com/orders", "dest": "orders_table"},
        {"url": "https://api.example.com/products", "dest": "products_table"},
    ]

    total = 0
    for source in sources:
        result = etl_pipeline(source["url"], source["dest"])
        total += result.get("records_loaded", 0)

    print(f"Total records loaded: {total}")

# === Deployment ===

# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
#
# deployment = Deployment.build_from_flow(
#     flow=etl_pipeline,
#     name="daily-etl",
#     schedule=CronSchedule(cron="0 2 * * *"),  # ทุกวัน 02:00
#     parameters={"source_url": "https://api.example.com/users"},
#     work_queue_name="default",
# )
# deployment.apply()

if __name__ == "__main__":
    etl_pipeline("https://jsonplaceholder.typicode.com/users")

GitOps CI/CD Pipeline

# === GitHub Actions — Prefect GitOps Pipeline ===
# .github/workflows/prefect-gitops.yml

name: Prefect GitOps
on:
  push:
    branches: [main]
    paths: ["flows/**", "deployments/**"]
  pull_request:
    branches: [main]
    paths: ["flows/**", "deployments/**"]

env:
  PREFECT_API_URL: }
  PREFECT_API_KEY: }

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install Dependencies
        run: |
          pip install prefect pytest pytest-asyncio
          pip install -r requirements.txt

      - name: Lint
        run: |
          pip install ruff
          ruff check flows/

      - name: Type Check
        run: |
          pip install mypy
          mypy flows/ --ignore-missing-imports

      - name: Unit Tests
        run: pytest tests/ -v --tb=short

      - name: Flow Validation
        run: |
          python -c "
          from flows.etl_pipeline import etl_pipeline
          from flows.multi_source import multi_source_etl
          print('All flows validated successfully')
          "

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install Dependencies
        run: |
          pip install prefect
          pip install -r requirements.txt

      - name: Login to Prefect
        run: prefect cloud login --key $PREFECT_API_KEY

      - name: Deploy Flows
        run: |
          python deployments/deploy_all.py
          echo "All flows deployed"

      - name: Verify Deployments
        run: |
          prefect deployment ls
          echo "Deployments verified"

      - name: Notify
        if: success()
        run: |
          curl -X POST } \
            -H 'Content-Type: application/json' \
            -d '{"text":"Prefect flows deployed via GitOps"}'

# === deployments/deploy_all.py ===
# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
# from flows.etl_pipeline import etl_pipeline
# from flows.multi_source import multi_source_etl
#
# # Daily ETL
# Deployment.build_from_flow(
#     flow=etl_pipeline,
#     name="daily-etl",
#     schedule=CronSchedule(cron="0 2 * * *"),
#     parameters={"source_url": "https://api.example.com/users"},
#     work_queue_name="production",
# ).apply()
#
# # Multi-source ETL
# Deployment.build_from_flow(
#     flow=multi_source_etl,
#     name="multi-source-daily",
#     schedule=CronSchedule(cron="0 3 * * *"),
#     work_queue_name="production",
# ).apply()
#
# print("All deployments applied")

Self-hosted Prefect Server

# === Self-hosted Prefect Server ด้วย Docker ===

cat > docker-compose-prefect.yml << 'EOF'
version: '3.8'
services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: prefect
      POSTGRES_USER: prefect
      POSTGRES_PASSWORD: SecurePass123
    volumes:
      - pg-data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U prefect"]
      interval: 10s
      timeout: 5s
      retries: 5

  prefect-server:
    image: prefecthq/prefect:2-latest
    command: prefect server start --host 0.0.0.0
    ports:
      - "4200:4200"
    environment:
      PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:SecurePass123@postgres:5432/prefect
      PREFECT_SERVER_API_HOST: 0.0.0.0
      PREFECT_API_URL: http://localhost:4200/api
    depends_on:
      postgres:
        condition: service_healthy

  prefect-worker:
    image: prefecthq/prefect:2-latest
    command: prefect worker start --pool default-pool --type process
    environment:
      PREFECT_API_URL: http://prefect-server:4200/api
    volumes:
      - ./flows:/opt/flows
    depends_on:
      - prefect-server

volumes:
  pg-data:
EOF

docker compose -f docker-compose-prefect.yml up -d

# ตรวจสอบ
# curl http://localhost:4200/api/health

# สร้าง Work Pool
# prefect work-pool create default-pool --type process

# Deploy Flow
# PREFECT_API_URL=http://localhost:4200/api prefect deploy --all

echo "Prefect Server: http://localhost:4200"
echo "  Dashboard: http://localhost:4200/dashboard"

Best Practices

Prefect คืออะไร

Open-source Workflow Orchestration สำหรับ Python จัดการ Data Pipelines ETL ML Workflows Web UI รองรับ Retry Caching Scheduling Notifications เขียนเป็น Python Code ปกติ @flow @task

Prefect ต่างจาก Airflow อย่างไร

Prefect เขียน Python ปกติไม่ต้อง DAG Objects Deploy ง่าย Dynamic Workflows UI สวย มี Cloud Airflow เหมาะ Enterprise Complex Scheduling Community ใหญ่กว่า

GitOps กับ Prefect ใช้อย่างไร

เก็บ Flows ใน Git ทุกการเปลี่ยนแปลงผ่าน PR CI/CD ทดสอบ Flows อัตโนมัติ Deploy เมื่อ Merge เข้า Main Version Control สำหรับทุก Workflow

Prefect Cloud กับ Self-hosted ต่างกันอย่างไร

Cloud Managed Service ไม่ต้อง Manage Server Dashboard RBAC Audit Log ฟรี 3 Users Self-hosted ติดตั้งเอง ข้อมูลอยู่ Infrastructure เอง ไม่มีค่า License เหมาะ Data Privacy

สรุป

Prefect เป็น Workflow Orchestration ที่เขียนง่ายเหมือน Python ปกติ เมื่อรวมกับ GitOps ได้ Workflow Management ที่ Version Controlled CI/CD Deploy อัตโนมัติ ใช้ Task Retries Caching สำหรับ Reliability Self-hosted ด้วย Docker สำหรับ Data Privacy ตั้ง Notifications เมื่อ Flow ล้มเหลว

📖 บทความที่เกี่ยวข้อง

PostgreSQL Full Text Search GitOps Workflowอ่านบทความ → Apache Arrow GitOps Workflowอ่านบทความ → Prefect Workflow API Gateway Patternอ่านบทความ → WordPress Headless GitOps Workflowอ่านบทความ → Prefect Workflow Hexagonal Architectureอ่านบทความ →

📚 ดูบทความทั้งหมด →