ai

Prefect Workflow กับ GitOps — วิธีใช้ Prefect

Prefect Workflow กับ GitOps — วิธีใช้ Prefect

Prefect Workflow Orchestration

Prefect Workflow กับ GitOps — วิธีใช้ Prefect

Prefect เป็น Workflow Orchestration Framework สำหรับ Python ที่ออกแบบมาให้เขียน Data Pipelines ได้ง่ายเหมือนเขียน Python ปกติ ใช้ Decorator @flow และ @task กำหนด Workflows รองรับ Retry, Caching, Scheduling และ Notifications

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Cimb โทร — คู่มือฉบับสมบูรณ์ 2026

เมื่อรวมกับ GitOps ได้ Workflow Management ที่ Version Controlled ทุกการเปลี่ยนแปลงผ่าน PR มี Review CI/CD Deploy Flows อัตโนมัติ

เนื้อหาเกี่ยวข้อง — Weights Biases Infrastructure as Code

Prefect Flows และ Tasks

# prefect_flows.py — Prefect Data Workflow
# pip install prefect prefect-sqlalchemy prefect-slack

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta, datetime
from typing import List, Dict
import httpx
import json

# === Tasks ===

@task(retries=3, retry_delay_seconds=10,
      cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(source_url: str) -> List[Dict]:
    """Extract data จาก API"""
    logger = get_run_logger()
    logger.info(f"Extracting from {source_url}")

    response = httpx.get(source_url, timeout=30)
    response.raise_for_status()
    data = response.json()

    logger.info(f"Extracted {len(data)} records")
    return data

@task(retries=2)
def transform_data(raw_data: List[Dict]) -> List[Dict]:
    """Transform และ Clean Data"""
    logger = get_run_logger()

    transformed = []
    for record in raw_data:
        cleaned = {
            "id": record.get("id"),
            "name": record.get("name", "").strip().lower(),
            "email": record.get("email", "").strip().lower(),
            "created_at": record.get("created_at", ""),
            "processed_at": datetime.now().isoformat(),
        }

        # Validation
        if cleaned["id"] and cleaned["email"]:
            transformed.append(cleaned)

    logger.info(f"Transformed {len(transformed)}/{len(raw_data)} records")
    return transformed

@task(retries=2)
def load_data(data: List[Dict], destination: str) -> int:
    """Load Data ไปยัง Destination"""
    logger = get_run_logger()
    logger.info(f"Loading {len(data)} records to {destination}")

    # Simulate loading to database
    loaded = 0
    for record in data:
        # INSERT INTO table VALUES (...)
        loaded += 1

    logger.info(f"Loaded {loaded} records")
    return loaded

@task
def send_notification(flow_name: str, records: int, status: str):
    """ส่ง Notification"""
    logger = get_run_logger()
    message = f"Flow: {flow_name} | Records: {records} | Status: {status}"
    logger.info(f"Notification: {message}")
    # ส่ง Slack, Email, etc.

# === Flows ===

@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(source_url: str, destination: str = "warehouse"):
    """Main ETL Pipeline"""
    # Extract
    raw_data = extract_data(source_url)

    # Transform
    clean_data = transform_data(raw_data)

    # Load
    loaded = load_data(clean_data, destination)

    # Notify
    send_notification("etl-pipeline", loaded, "success")

    return {"records_loaded": loaded}

@flow(name="multi-source-etl", log_prints=True)
def multi_source_etl():
    """ETL จากหลาย Sources"""
    sources = [
        {"url": "https://api.example.com/users", "dest": "users_table"},
        {"url": "https://api.example.com/orders", "dest": "orders_table"},
        {"url": "https://api.example.com/products", "dest": "products_table"},
    ]

    total = 0
    for source in sources:
        result = etl_pipeline(source["url"], source["dest"])
        total += result.get("records_loaded", 0)

    print(f"Total records loaded: {total}")

# === Deployment ===

# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
#
# deployment = Deployment.build_from_flow(
#     flow=etl_pipeline,
#     name="daily-etl",
#     schedule=CronSchedule(cron="0 2 * * *"),  # ทุกวัน 02:00
#     parameters={"source_url": "https://api.example.com/users"},
#     work_queue_name="default",
# )
# deployment.apply()

if __name__ == "__main__":
    etl_pipeline("https://jsonplaceholder.typicode.com/users")

GitOps CI/CD Pipeline

Prefect Workflow กับ GitOps — วิธีใช้ Prefect
# === GitHub Actions — Prefect GitOps Pipeline ===
# .github/workflows/prefect-gitops.yml

name: Prefect GitOps
on:
  push:
    branches: [main]
    paths: ["flows/**", "deployments/**"]
  pull_request:
    branches: [main]
    paths: ["flows/**", "deployments/**"]

env:
  PREFECT_API_URL: }
  PREFECT_API_KEY: }

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install Dependencies
        run: |
          pip install prefect pytest pytest-asyncio
          pip install -r requirements.txt

      - name: Lint
        run: |
          pip install ruff
          ruff check flows/

      - name: Type Check
        run: |
          pip install mypy
          mypy flows/ --ignore-missing-imports

      - name: Unit Tests
        run: pytest tests/ -v --tb=short

      - name: Flow Validation
        run: |
          python -c "
          from flows.etl_pipeline import etl_pipeline
          from flows.multi_source import multi_source_etl
          print('All flows validated successfully')
          "

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install Dependencies
        run: |
          pip install prefect
          pip install -r requirements.txt

      - name: Login to Prefect
        run: prefect cloud login --key $PREFECT_API_KEY

      - name: Deploy Flows
        run: |
          python deployments/deploy_all.py
          echo "All flows deployed"

      - name: Verify Deployments
        run: |
          prefect deployment ls
          echo "Deployments verified"

      - name: Notify
        if: success()
        run: |
          curl -X POST } \
            -H 'Content-Type: application/json' \
            -d '{"text":"Prefect flows deployed via GitOps"}'

# === deployments/deploy_all.py ===
# from prefect.deployments import Deployment
# from prefect.server.schemas.schedules import CronSchedule
# from flows.etl_pipeline import etl_pipeline
# from flows.multi_source import multi_source_etl
#
# # Daily ETL
# Deployment.build_from_flow(
#     flow=etl_pipeline,
#     name="daily-etl",
#     schedule=CronSchedule(cron="0 2 * * *"),
#     parameters={"source_url": "https://api.example.com/users"},
#     work_queue_name="production",
# ).apply()
#
# # Multi-source ETL
# Deployment.build_from_flow(
#     flow=multi_source_etl,
#     name="multi-source-daily",
#     schedule=CronSchedule(cron="0 3 * * *"),
#     work_queue_name="production",
# ).apply()
#
# print("All deployments applied")

Self-hosted Prefect Server

# === Self-hosted Prefect Server ด้วย Docker ===

cat > docker-compose-prefect.yml << 'EOF'
version: '3.8'
services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: prefect
      POSTGRES_USER: prefect
      POSTGRES_PASSWORD: SecurePass123
    volumes:
      - pg-data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U prefect"]
      interval: 10s
      timeout: 5s
      retries: 5

  prefect-server:
    image: prefecthq/prefect:2-latest
    command: prefect server start --host 0.0.0.0
    ports:
      - "4200:4200"
    environment:
      PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:SecurePass123@postgres:5432/prefect
      PREFECT_SERVER_API_HOST: 0.0.0.0
      PREFECT_API_URL: http://localhost:4200/api
    depends_on:
      postgres:
        condition: service_healthy

  prefect-worker:
    image: prefecthq/prefect:2-latest
    command: prefect worker start --pool default-pool --type process
    environment:
      PREFECT_API_URL: http://prefect-server:4200/api
    volumes:
      - ./flows:/opt/flows
    depends_on:
      - prefect-server

volumes:
  pg-data:
EOF

docker compose -f docker-compose-prefect.yml up -d

# ตรวจสอบ
# curl http://localhost:4200/api/health

# สร้าง Work Pool
# prefect work-pool create default-pool --type process

# Deploy Flow
# PREFECT_API_URL=http://localhost:4200/api prefect deploy --all

echo "Prefect Server: http://localhost:4200"
echo "  Dashboard: http://localhost:4200/dashboard"

Best Practices

  • Flows in Git: เก็บ Flow Code ใน Git Repository ทุกการเปลี่ยนแปลงผ่าน PR
  • Task Retries: ตั้ง Retries สำหรับ Tasks ที่เรียก External APIs ป้องกัน Transient Failures
  • Caching: ใช้ Task Caching สำหรับ Data ที่ไม่เปลี่ยนบ่อย ลด API Calls
  • Parameters: ใช้ Flow Parameters แทน Hardcode Values ทำให้ Reusable
  • Testing: เขียน Unit Tests สำหรับ Tasks ทดสอบก่อน Deploy
  • Monitoring: ใช้ Prefect UI ดู Flow Runs ตั้ง Notifications เมื่อ Flow ล้มเหลว

Prefect คืออะไร

Open-source Workflow Orchestration สำหรับ Python จัดการ Data Pipelines ETL ML Workflows Web UI รองรับ Retry Caching Scheduling Notifications เขียนเป็น Python Code ปกติ @flow @task

แนะนำเพิ่มเติม — ระบบเทรดของ iCafeForex

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ LocalAI Self-hosted SaaS Architecture

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง