SiamCafe.net Blog
Technology

Prefect Workflow Testing Strategy QA

prefect workflow testing strategy qa
Prefect Workflow Testing Strategy QA | SiamCafe Blog
2026-01-16· อ. บอม — SiamCafe.net· 10,171 คำ

Prefect Testing Strategy

Prefect Workflow Testing QA Unit Integration E2E Data Quality pytest Mock CI/CD Coverage Pipeline Production

Test TypeScopeSpeedFrequencyCoverage Target
Unit TestSingle Taskเร็วมาก (ms)ทุก Commit70% ของ Tests
Integration TestFull Flowปานกลาง (s)ทุก PR20% ของ Tests
E2E TestProduction-likeช้า (min)ทุก Deploy10% ของ Tests
Data QualityOutput Dataปานกลางทุก Runทุก Critical Field
PerformanceDuration/Memoryช้าWeeklySLA Compliance

Unit Testing

# === Prefect Unit Tests with pytest ===

# from prefect import flow, task
# import pytest
# from unittest.mock import patch, MagicMock
#
# @task
# def extract_data(api_url: str) -> dict:
#     import requests
#     response = requests.get(api_url)
#     response.raise_for_status()
#     return response.json()
#
# @task
# def transform_data(raw: dict) -> list:
#     records = raw.get("results", [])
#     return [
#         {"id": r["id"], "name": r["name"].strip().lower(),
#          "score": round(r["score"], 2)}
#         for r in records if r.get("score", 0) > 0
#     ]
#
# @task
# def validate_data(records: list) -> list:
#     valid = [r for r in records if r["id"] and r["name"] and r["score"] > 0]
#     if len(valid) < len(records) * 0.9:
#         raise ValueError(f"Too many invalid: {len(records)-len(valid)}/{len(records)}")
#     return valid

# === Tests ===
# def test_transform_data_basic():
#     raw = {"results": [
#         {"id": 1, "name": "  Alice ", "score": 95.123},
#         {"id": 2, "name": "Bob", "score": 0},  # filtered out
#         {"id": 3, "name": "Carol", "score": 88.7},
#     ]}
#     result = transform_data.fn(raw)
#     assert len(result) == 2
#     assert result[0]["name"] == "alice"
#     assert result[0]["score"] == 95.12
#
# def test_transform_data_empty():
#     assert transform_data.fn({"results": []}) == []
#     assert transform_data.fn({}) == []
#
# def test_validate_data_pass():
#     records = [{"id": 1, "name": "a", "score": 1}] * 10
#     assert len(validate_data.fn(records)) == 10
#
# def test_validate_data_fail():
#     records = [{"id": None, "name": "", "score": 0}] * 10
#     with pytest.raises(ValueError):
#         validate_data.fn(records)

from dataclasses import dataclass

@dataclass
class TestCase:
    test_name: str
    task_tested: str
    input_desc: str
    expected: str
    test_type: str

tests = [
    TestCase("test_transform_basic",
        "transform_data",
        "3 records (1 with score=0)",
        "2 records filtered, names lowercase trimmed, score rounded",
        "Happy Path"),
    TestCase("test_transform_empty",
        "transform_data",
        "Empty results / Missing key",
        "Empty list (no crash)",
        "Edge Case"),
    TestCase("test_validate_pass",
        "validate_data",
        "10 valid records",
        "All 10 returned",
        "Happy Path"),
    TestCase("test_validate_fail",
        "validate_data",
        "10 invalid records (> 10% invalid)",
        "Raises ValueError",
        "Error Handling"),
    TestCase("test_extract_mock",
        "extract_data",
        "Mock API response",
        "Returns parsed JSON",
        "Mock External"),
    TestCase("test_extract_error",
        "extract_data",
        "Mock API 500 error",
        "Raises HTTPError",
        "Error Handling"),
]

print("=== Unit Test Cases ===")
for t in tests:
    print(f"  [{t.test_name}] Task: {t.task_tested}")
    print(f"    Input: {t.input_desc}")
    print(f"    Expected: {t.expected}")
    print(f"    Type: {t.test_type}")

Integration Testing

# === Integration Test with Test Database ===

# @pytest.fixture
# def test_db():
#     """Create test PostgreSQL with Testcontainers"""
#     from testcontainers.postgres import PostgresContainer
#     with PostgresContainer("postgres:16") as pg:
#         yield pg.get_connection_url()
#
# def test_full_flow(test_db, mock_api):
#     """Integration test: full ETL flow"""
#     result = etl_flow(
#         api_url=mock_api.url,
#         db_url=test_db,
#         table="test_output"
#     )
#     assert result.is_completed()
#     # Verify data in DB
#     rows = query_db(test_db, "SELECT count(*) FROM test_output")
#     assert rows[0][0] > 0

@dataclass
class IntegrationTest:
    test: str
    components: str
    setup: str
    assertion: str

integration_tests = [
    IntegrationTest("Full ETL Flow",
        "API → Transform → PostgreSQL",
        "Mock API + Testcontainers PostgreSQL",
        "Data in DB matches expected count + schema"),
    IntegrationTest("Retry on Failure",
        "API (fail 2x) → Retry → Success",
        "Mock API return 500 twice then 200",
        "Flow completes after retries, data correct"),
    IntegrationTest("Idempotency",
        "Run flow twice with same input",
        "Test DB + same input data",
        "DB has same result (no duplicates)"),
    IntegrationTest("Schema Change",
        "API returns new fields",
        "Mock API with extra/missing fields",
        "Flow handles gracefully, no crash"),
    IntegrationTest("Large Dataset",
        "API returns 100K records",
        "Mock API with large payload",
        "Flow completes within SLA, memory OK"),
]

print("=== Integration Tests ===")
for t in integration_tests:
    print(f"  [{t.test}] Components: {t.components}")
    print(f"    Setup: {t.setup}")
    print(f"    Assert: {t.assertion}")

CI/CD Pipeline

# === GitHub Actions CI/CD ===

# .github/workflows/test.yml
# name: Prefect Pipeline Tests
# on: [pull_request]
# jobs:
#   unit-tests:
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#       - uses: actions/setup-python@v5
#         with: { python-version: '3.12' }
#       - run: pip install -e ".[test]"
#       - run: pytest tests/unit/ -v --cov=src --cov-report=xml
#       - uses: codecov/codecov-action@v4
#
#   integration-tests:
#     needs: unit-tests
#     runs-on: ubuntu-latest
#     services:
#       postgres: { image: 'postgres:16', env: {POSTGRES_PASSWORD: test} }
#     steps:
#       - uses: actions/checkout@v4
#       - run: pip install -e ".[test]"
#       - run: pytest tests/integration/ -v

@dataclass
class CIStage:
    stage: str
    trigger: str
    tests: str
    duration: str
    fail_action: str

ci_stages = [
    CIStage("Lint + Type Check",
        "Every Commit (pre-commit)",
        "ruff check + mypy",
        "< 30 seconds",
        "Block commit"),
    CIStage("Unit Tests",
        "Every PR",
        "pytest tests/unit/ --cov",
        "< 2 minutes",
        "Block PR merge"),
    CIStage("Integration Tests",
        "Every PR (after Unit pass)",
        "pytest tests/integration/ with Testcontainers",
        "< 10 minutes",
        "Block PR merge"),
    CIStage("E2E Tests",
        "Before Production Deploy",
        "pytest tests/e2e/ on Staging",
        "< 30 minutes",
        "Block deploy"),
    CIStage("Deploy",
        "After all tests pass + Approval",
        "prefect deploy --all",
        "< 5 minutes",
        "Rollback previous version"),
]

print("=== CI/CD Pipeline ===")
for s in ci_stages:
    print(f"  [{s.stage}] Trigger: {s.trigger}")
    print(f"    Tests: {s.tests}")
    print(f"    Duration: {s.duration}")
    print(f"    On Fail: {s.fail_action}")

เคล็ดลับ

Prefect คืออะไร

Python Workflow Orchestration @flow @task Retry Caching Concurrency Schedule UI Notifications Deploy Cloud Self-hosted Pythonic

Testing Strategy มีอะไร

Unit 70% Integration 20% E2E 10% Data Quality Performance Idempotency Regression Contract pytest Mock Testcontainers Coverage

Unit Test เขียนอย่างไร

pytest task.fn() Mock patch MagicMock fixtures parametrize Transform Edge Case Error Handling Retry Logic Coverage 80%+

CI/CD ตั้งอย่างไร

GitHub Actions Lint Unit Integration E2E Deploy pre-commit Testcontainers Coverage codecov Staging Production Rollback Approval

สรุป

Prefect Workflow Testing QA pytest Unit Integration E2E Mock Testcontainers CI/CD Coverage Data Quality Idempotency Production

📖 บทความที่เกี่ยวข้อง

Prefect Workflow API Gateway Patternอ่านบทความ → Prefect Workflow Hexagonal Architectureอ่านบทความ → PostgreSQL Full Text Search Testing Strategy QAอ่านบทความ → Apache Druid Load Testing Strategyอ่านบทความ → A/B Testing ML DevSecOps Integrationอ่านบทความ →

📚 ดูบทความทั้งหมด →