Prefect Testing Strategy
Prefect Workflow Testing QA Unit Integration E2E Data Quality pytest Mock CI/CD Coverage Pipeline Production
| Test Type | Scope | Speed | Frequency | Coverage Target |
|---|---|---|---|---|
| Unit Test | Single Task | เร็วมาก (ms) | ทุก Commit | 70% ของ Tests |
| Integration Test | Full Flow | ปานกลาง (s) | ทุก PR | 20% ของ Tests |
| E2E Test | Production-like | ช้า (min) | ทุก Deploy | 10% ของ Tests |
| Data Quality | Output Data | ปานกลาง | ทุก Run | ทุก Critical Field |
| Performance | Duration/Memory | ช้า | Weekly | SLA Compliance |
Unit Testing
# === Prefect Unit Tests with pytest ===
# from prefect import flow, task
# import pytest
# from unittest.mock import patch, MagicMock
#
# @task
# def extract_data(api_url: str) -> dict:
# import requests
# response = requests.get(api_url)
# response.raise_for_status()
# return response.json()
#
# @task
# def transform_data(raw: dict) -> list:
# records = raw.get("results", [])
# return [
# {"id": r["id"], "name": r["name"].strip().lower(),
# "score": round(r["score"], 2)}
# for r in records if r.get("score", 0) > 0
# ]
#
# @task
# def validate_data(records: list) -> list:
# valid = [r for r in records if r["id"] and r["name"] and r["score"] > 0]
# if len(valid) < len(records) * 0.9:
# raise ValueError(f"Too many invalid: {len(records)-len(valid)}/{len(records)}")
# return valid
# === Tests ===
# def test_transform_data_basic():
# raw = {"results": [
# {"id": 1, "name": " Alice ", "score": 95.123},
# {"id": 2, "name": "Bob", "score": 0}, # filtered out
# {"id": 3, "name": "Carol", "score": 88.7},
# ]}
# result = transform_data.fn(raw)
# assert len(result) == 2
# assert result[0]["name"] == "alice"
# assert result[0]["score"] == 95.12
#
# def test_transform_data_empty():
# assert transform_data.fn({"results": []}) == []
# assert transform_data.fn({}) == []
#
# def test_validate_data_pass():
# records = [{"id": 1, "name": "a", "score": 1}] * 10
# assert len(validate_data.fn(records)) == 10
#
# def test_validate_data_fail():
# records = [{"id": None, "name": "", "score": 0}] * 10
# with pytest.raises(ValueError):
# validate_data.fn(records)
from dataclasses import dataclass
@dataclass
class TestCase:
test_name: str
task_tested: str
input_desc: str
expected: str
test_type: str
tests = [
TestCase("test_transform_basic",
"transform_data",
"3 records (1 with score=0)",
"2 records filtered, names lowercase trimmed, score rounded",
"Happy Path"),
TestCase("test_transform_empty",
"transform_data",
"Empty results / Missing key",
"Empty list (no crash)",
"Edge Case"),
TestCase("test_validate_pass",
"validate_data",
"10 valid records",
"All 10 returned",
"Happy Path"),
TestCase("test_validate_fail",
"validate_data",
"10 invalid records (> 10% invalid)",
"Raises ValueError",
"Error Handling"),
TestCase("test_extract_mock",
"extract_data",
"Mock API response",
"Returns parsed JSON",
"Mock External"),
TestCase("test_extract_error",
"extract_data",
"Mock API 500 error",
"Raises HTTPError",
"Error Handling"),
]
print("=== Unit Test Cases ===")
for t in tests:
print(f" [{t.test_name}] Task: {t.task_tested}")
print(f" Input: {t.input_desc}")
print(f" Expected: {t.expected}")
print(f" Type: {t.test_type}")
Integration Testing
# === Integration Test with Test Database ===
# @pytest.fixture
# def test_db():
# """Create test PostgreSQL with Testcontainers"""
# from testcontainers.postgres import PostgresContainer
# with PostgresContainer("postgres:16") as pg:
# yield pg.get_connection_url()
#
# def test_full_flow(test_db, mock_api):
# """Integration test: full ETL flow"""
# result = etl_flow(
# api_url=mock_api.url,
# db_url=test_db,
# table="test_output"
# )
# assert result.is_completed()
# # Verify data in DB
# rows = query_db(test_db, "SELECT count(*) FROM test_output")
# assert rows[0][0] > 0
@dataclass
class IntegrationTest:
test: str
components: str
setup: str
assertion: str
integration_tests = [
IntegrationTest("Full ETL Flow",
"API → Transform → PostgreSQL",
"Mock API + Testcontainers PostgreSQL",
"Data in DB matches expected count + schema"),
IntegrationTest("Retry on Failure",
"API (fail 2x) → Retry → Success",
"Mock API return 500 twice then 200",
"Flow completes after retries, data correct"),
IntegrationTest("Idempotency",
"Run flow twice with same input",
"Test DB + same input data",
"DB has same result (no duplicates)"),
IntegrationTest("Schema Change",
"API returns new fields",
"Mock API with extra/missing fields",
"Flow handles gracefully, no crash"),
IntegrationTest("Large Dataset",
"API returns 100K records",
"Mock API with large payload",
"Flow completes within SLA, memory OK"),
]
print("=== Integration Tests ===")
for t in integration_tests:
print(f" [{t.test}] Components: {t.components}")
print(f" Setup: {t.setup}")
print(f" Assert: {t.assertion}")
CI/CD Pipeline
# === GitHub Actions CI/CD ===
# .github/workflows/test.yml
# name: Prefect Pipeline Tests
# on: [pull_request]
# jobs:
# unit-tests:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - uses: actions/setup-python@v5
# with: { python-version: '3.12' }
# - run: pip install -e ".[test]"
# - run: pytest tests/unit/ -v --cov=src --cov-report=xml
# - uses: codecov/codecov-action@v4
#
# integration-tests:
# needs: unit-tests
# runs-on: ubuntu-latest
# services:
# postgres: { image: 'postgres:16', env: {POSTGRES_PASSWORD: test} }
# steps:
# - uses: actions/checkout@v4
# - run: pip install -e ".[test]"
# - run: pytest tests/integration/ -v
@dataclass
class CIStage:
stage: str
trigger: str
tests: str
duration: str
fail_action: str
ci_stages = [
CIStage("Lint + Type Check",
"Every Commit (pre-commit)",
"ruff check + mypy",
"< 30 seconds",
"Block commit"),
CIStage("Unit Tests",
"Every PR",
"pytest tests/unit/ --cov",
"< 2 minutes",
"Block PR merge"),
CIStage("Integration Tests",
"Every PR (after Unit pass)",
"pytest tests/integration/ with Testcontainers",
"< 10 minutes",
"Block PR merge"),
CIStage("E2E Tests",
"Before Production Deploy",
"pytest tests/e2e/ on Staging",
"< 30 minutes",
"Block deploy"),
CIStage("Deploy",
"After all tests pass + Approval",
"prefect deploy --all",
"< 5 minutes",
"Rollback previous version"),
]
print("=== CI/CD Pipeline ===")
for s in ci_stages:
print(f" [{s.stage}] Trigger: {s.trigger}")
print(f" Tests: {s.tests}")
print(f" Duration: {s.duration}")
print(f" On Fail: {s.fail_action}")
เคล็ดลับ
- .fn(): ใช้ task.fn() ทดสอบ Task Logic โดยไม่ผ่าน Prefect Engine
- Mock: Mock External API/DB ทุกตัวใน Unit Test
- Testcontainers: ใช้ Testcontainers สำหรับ Integration Test
- Coverage: Target > 80% เน้น Transform Logic + Error Handling
- Idempotent: ทดสอบ Idempotency ทุก Flow ป้องกัน Duplicate
Prefect คืออะไร
Python Workflow Orchestration @flow @task Retry Caching Concurrency Schedule UI Notifications Deploy Cloud Self-hosted Pythonic
Testing Strategy มีอะไร
Unit 70% Integration 20% E2E 10% Data Quality Performance Idempotency Regression Contract pytest Mock Testcontainers Coverage
Unit Test เขียนอย่างไร
pytest task.fn() Mock patch MagicMock fixtures parametrize Transform Edge Case Error Handling Retry Logic Coverage 80%+
CI/CD ตั้งอย่างไร
GitHub Actions Lint Unit Integration E2E Deploy pre-commit Testcontainers Coverage codecov Staging Production Rollback Approval
สรุป
Prefect Workflow Testing QA pytest Unit Integration E2E Mock Testcontainers CI/CD Coverage Data Quality Idempotency Production
