Prefect Workflow Testing Strategy QA — ทดสอบ Data Pipeline ด้วย Prefect | SiamCafe Blog

Prefect Workflow Testing Strategy QA — ทดสอบ Data Pipeline ด้วย Prefect | SiamCafe Blog
Prefect Workflow Testing Strategy QA — ทดสอบ Data Pipeline ด้วย Prefect | SiamCafe Blog

Prefect Testing Strategy

Prefect Workflow Testing QA Unit Integration E2E Data Quality pytest Mock CI/CD Coverage Pipeline Production

Test TypeScopeSpeedFrequencyCoverage Target
Unit TestSingle Taskเร็วมาก (ms)ทุก Commit70% ของ Tests
Integration TestFull Flowปานกลาง (s)ทุก PR20% ของ Tests
E2E TestProduction-likeช้า (min)ทุก Deploy10% ของ Tests
Data QualityOutput Dataปานกลางทุก Runทุก Critical Field
PerformanceDuration/Memoryช้าWeeklySLA Compliance

Unit Testing

# === Prefect Unit Tests with pytest ===

# from prefect import flow, task
# import pytest
# from unittest.mock import patch, MagicMock
#
# @task
# def extract_data(api_url: str) -> dict:
# import requests
# response = requests.get(api_url)
# response.raise_for_status()
# return response.json()
#
# @task
# def transform_data(raw: dict) -> list:
# records = raw.get("results", [])
# return [
# {"id": r["id"], "name": r["name"].strip().lower(),
# "score": round(r["score"], 2)}
# for r in records if r.get("score", 0) > 0
# ]
#
# @task
# def validate_data(records: list) -> list:
# valid = [r for r in records if r["id"] and r["name"] and r["score"] > 0]
# if len(valid) < len(records) * 0.9:
# raise ValueError(f"Too many invalid: {len(records)-len(valid)}/{len(records)}")
# return valid

# === Tests ===
# def test_transform_data_basic():
# raw = {"results": [
# {"id": 1, "name": " Alice ", "score": 95.123},
# {"id": 2, "name": "Bob", "score": 0}, # filtered out
# {"id": 3, "name": "Carol", "score": 88.7},
# ]}
# result = transform_data.fn(raw)
# assert len(result) == 2
# assert result[0]["name"] == "alice"
# assert result[0]["score"] == 95.12
#
# def test_transform_data_empty():
# assert transform_data.fn({"results": []}) == []
# assert transform_data.fn({}) == []
#
# def test_validate_data_pass():
# records = [{"id": 1, "name": "a", "score": 1}] * 10
# assert len(validate_data.fn(records)) == 10
#
# def test_validate_data_fail():
# records = [{"id": None, "name": "", "score": 0}] * 10
# with pytest.raises(ValueError):
# validate_data.fn(records)

from dataclasses import dataclass

@dataclass
class TestCase:
 test_name: str
 task_tested: str
 input_desc: str
 expected: str
 test_type: str

tests = [
 TestCase("test_transform_basic",
 "transform_data",
 "3 records (1 with score=0)",
 "2 records filtered, names lowercase trimmed, score rounded",
 "Happy Path"),
 TestCase("test_transform_empty",
 "transform_data",
 "Empty results / Missing key",
 "Empty list (no crash)",
 "Edge Case"),
 TestCase("test_validate_pass",
 "validate_data",
 "10 valid records",
 "All 10 returned",
 "Happy Path"),
 TestCase("test_validate_fail",
 "validate_data",
 "10 invalid records (> 10% invalid)",
 "Raises ValueError",
 "Error Handling"),
 TestCase("test_extract_mock",
 "extract_data",
 "Mock API response",
 "Returns parsed JSON",
 "Mock External"),
 TestCase("test_extract_error",
 "extract_data",
 "Mock API 500 error",
 "Raises HTTPError",
 "Error Handling"),
]

print("=== Unit Test Cases ===")
for t in tests:
 print(f" [{t.test_name}] Task: {t.task_tested}")
 print(f" Input: {t.input_desc}")
 print(f" Expected: {t.expected}")
 print(f" Type: {t.test_type}")

Integration Testing

# === Integration Test with Test Database ===

# @pytest.fixture
# def test_db():
# """Create test PostgreSQL with Testcontainers"""
# from testcontainers.postgres import PostgresContainer
# with PostgresContainer("postgres:16") as pg:
# yield pg.get_connection_url()
#
# def test_full_flow(test_db, mock_api):
# """Integration test: full ETL flow"""
# result = etl_flow(
# api_url=mock_api.url,
# db_url=test_db,
# table="test_output"
# )
# assert result.is_completed()
# # Verify data in DB
# rows = query_db(test_db, "SELECT count(*) FROM test_output")
# assert rows[0][0] > 0

@dataclass
class IntegrationTest:
 test: str
 components: str
 setup: str
 assertion: str

integration_tests = [
 IntegrationTest("Full ETL Flow",
 "API → Transform → PostgreSQL",
 "Mock API + Testcontainers PostgreSQL",
 "Data in DB matches expected count + schema"),
 IntegrationTest("Retry on Failure",
 "API (fail 2x) → Retry → Success",
 "Mock API return 500 twice then 200",
 "Flow completes after retries, data correct"),
 IntegrationTest("Idempotency",
 "Run flow twice with same input",
 "Test DB + same input data",
 "DB has same result (no duplicates)"),
 IntegrationTest("Schema Change",
 "API returns new fields",
 "Mock API with extra/missing fields",
 "Flow handles gracefully, no crash"),
 IntegrationTest("Large Dataset",
 "API returns 100K records",
 "Mock API with large payload",
 "Flow completes within SLA, memory OK"),
]

print("=== Integration Tests ===")
for t in integration_tests:
 print(f" [{t.test}] Components: {t.components}")
 print(f" Setup: {t.setup}")
 print(f" Assert: {t.assertion}")

CI/CD Pipeline

# === GitHub Actions CI/CD ===

# .github/workflows/test.yml
# name: Prefect Pipeline Tests
# on: [pull_request]
# jobs:
# unit-tests:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - uses: actions/setup-python@v5
# with: { python-version: '3.12' }
# - run: pip install -e ".[test]"
# - run: pytest tests/unit/ -v --cov=src --cov-report=xml
# - uses: codecov/codecov-action@v4
#
# integration-tests:
# needs: unit-tests
# runs-on: ubuntu-latest
# services:
# postgres: { image: 'postgres:16', env: {POSTGRES_PASSWORD: test} }
# steps:
# - uses: actions/checkout@v4
# - run: pip install -e ".[test]"
# - run: pytest tests/integration/ -v

@dataclass
class CIStage:
 stage: str
 trigger: str
 tests: str
 duration: str
 fail_action: str

ci_stages = [
 CIStage("Lint + Type Check",
 "Every Commit (pre-commit)",
 "ruff check + mypy",
 "< 30 seconds",
 "Block commit"),
 CIStage("Unit Tests",
 "Every PR",
 "pytest tests/unit/ --cov",
 "< 2 minutes",
 "Block PR merge"),
 CIStage("Integration Tests",
 "Every PR (after Unit pass)",
 "pytest tests/integration/ with Testcontainers",
 "< 10 minutes",
 "Block PR merge"),
 CIStage("E2E Tests",
 "Before Production Deploy",
 "pytest tests/e2e/ on Staging",
 "< 30 minutes",
 "Block deploy"),
 CIStage("Deploy",
 "After all tests pass + Approval",
 "prefect deploy --all",
 "< 5 minutes",
 "Rollback previous version"),
]

print("=== CI/CD Pipeline ===")
for s in ci_stages:
 print(f" [{s.stage}] Trigger: {s.trigger}")
 print(f" Tests: {s.tests}")
 print(f" Duration: {s.duration}")
 print(f" On Fail: {s.fail_action}")

เคล็ดลับ

  • .fn(): ใช้ task.fn() ทดสอบ Task Logic โดยไม่ผ่าน Prefect Engine
  • Mock: Mock External API/DB ทุกตัวใน Unit Test
  • Testcontainers: ใช้ Testcontainers สำหรับ Integration Test
  • Coverage: Target > 80% เน้น Transform Logic + Error Handling
  • Idempotent: ทดสอบ Idempotency ทุก Flow ป้องกัน Duplicate

Prefect คืออะไร

Python Workflow Orchestration @flow @task Retry Caching Concurrency Schedule UI Notifications Deploy Cloud Self-hosted Pythonic

Testing Strategy มีอะไร

Unit 70% Integration 20% E2E 10% Data Quality Performance Idempotency Regression Contract pytest Mock Testcontainers Coverage

Unit Test เขียนอย่างไร

pytest task.fn() Mock patch MagicMock fixtures parametrize Transform Edge Case Error Handling Retry Logic Coverage 80%+

CI/CD ตั้งอย่างไร

GitHub Actions Lint Unit Integration E2E Deploy pre-commit Testcontainers Coverage codecov Staging Production Rollback Approval

สรุป

Prefect Workflow Testing QA pytest Unit Integration E2E Mock Testcontainers CI/CD Coverage Data Quality Idempotency Production