SiamCafe.net Blog
Technology

Prefect Workflow Feature Flag Management — จัดการ Workflows ด้วย Feature Flags

prefect workflow feature flag management
Prefect Workflow Feature Flag Management | SiamCafe Blog
2025-09-12· อ. บอม — SiamCafe.net· 1,486 คำ

Prefect คืออะไรและ Feature Flags ทำงานอย่างไร

Prefect เป็น modern workflow orchestration framework สำหรับ Python ใช้สร้างและจัดการ data pipelines, ETL jobs, ML workflows และ automated tasks ข้อดีเทียบกับ Airflow คือ ใช้ pure Python (ไม่ต้องเรียน DSL ใหม่), dynamic workflows, better error handling และ modern UI

Feature Flags (Feature Toggles) คือเทคนิคที่ให้ enable/disable features ของ application โดยไม่ต้อง deploy code ใหม่ ใช้สำหรับ gradual rollout เปิด feature ให้ผู้ใช้บางส่วนก่อน, A/B testing ทดสอบ features ต่างๆ, kill switch ปิด feature ที่มีปัญหาทันที, trunk-based development merge code เข้า main branch โดยซ่อน feature ที่ยังไม่พร้อม และ operational flags ปรับ behavior ของระบบตาม load

การรวม Prefect กับ Feature Flags ช่วยให้ toggle workflow steps โดยไม่ต้อง redeploy, A/B test data pipeline configurations, gradual rollout ของ new data processing logic, emergency disable ของ resource-intensive tasks และ dynamic configuration changes ผ่าน feature flag service

ติดตั้ง Prefect และสร้าง Workflows

เริ่มต้นใช้งาน Prefect

# === ติดตั้ง Prefect ===
pip install prefect

# ตรวจสอบ version
prefect version

# Start Prefect server (local)
prefect server start
# UI: http://localhost:4200

# === สร้าง Workflow แรก ===
# etl_flow.py

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd

@task(retries=3, retry_delay_seconds=60,
      cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def extract_data(source_url: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f"Extracting from {source_url}")
    
    df = pd.read_csv(source_url)
    logger.info(f"Extracted {len(df)} rows")
    return df

@task
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    logger = get_run_logger()
    
    df = df.dropna()
    df["processed_at"] = pd.Timestamp.now()
    
    logger.info(f"Transformed: {len(df)} rows")
    return df

@task
def load_data(df: pd.DataFrame, target: str):
    logger = get_run_logger()
    
    df.to_parquet(target, index=False)
    logger.info(f"Loaded {len(df)} rows to {target}")

@flow(name="ETL Pipeline", log_prints=True)
def etl_pipeline(source: str, target: str):
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed, target)
    
    print(f"Pipeline complete: {source} -> {target}")

# Run locally
# etl_pipeline("data/input.csv", "data/output.parquet")

# === Deploy as scheduled flow ===
# prefect deployment build etl_flow.py:etl_pipeline \
#     --name "daily-etl" \
#     --cron "0 6 * * *" \
#     --pool default-agent-pool

# prefect deployment apply etl_pipeline-deployment.yaml

# === Docker Deployment ===
# Dockerfile
# FROM python:3.11-slim
# RUN pip install prefect pandas pyarrow
# COPY . /app
# WORKDIR /app
# CMD ["prefect", "agent", "start", "-p", "default-agent-pool"]

# docker-compose.yml
# services:
#   prefect-server:
#     image: prefecthq/prefect:2-latest
#     command: prefect server start --host 0.0.0.0
#     ports: ["4200:4200"]
#
#   prefect-agent:
#     build: .
#     environment:
#       PREFECT_API_URL: http://prefect-server:4200/api
#     depends_on: [prefect-server]

echo "Prefect setup complete"

สร้าง Feature Flag System ด้วย Python

สร้างระบบ Feature Flags

#!/usr/bin/env python3
# feature_flags.py — Feature Flag Management System
import json
import time
import logging
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, Optional, List
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("feature_flags")

@dataclass
class FeatureFlag:
    name: str
    enabled: bool = False
    description: str = ""
    rollout_percentage: int = 100
    allowed_users: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)
    created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    updated_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())

class FeatureFlagManager:
    def __init__(self, storage_path="feature_flags.json"):
        self.storage_path = Path(storage_path)
        self.flags: Dict[str, FeatureFlag] = {}
        self._load()
    
    def _load(self):
        if self.storage_path.exists():
            data = json.loads(self.storage_path.read_text())
            for name, flag_data in data.items():
                self.flags[name] = FeatureFlag(**flag_data)
    
    def _save(self):
        data = {}
        for name, flag in self.flags.items():
            data[name] = {
                "name": flag.name,
                "enabled": flag.enabled,
                "description": flag.description,
                "rollout_percentage": flag.rollout_percentage,
                "allowed_users": flag.allowed_users,
                "metadata": flag.metadata,
                "created_at": flag.created_at,
                "updated_at": flag.updated_at,
            }
        self.storage_path.write_text(json.dumps(data, indent=2))
    
    def create(self, name: str, enabled: bool = False,
               description: str = "", rollout_percentage: int = 100):
        flag = FeatureFlag(
            name=name, enabled=enabled,
            description=description,
            rollout_percentage=rollout_percentage,
        )
        self.flags[name] = flag
        self._save()
        logger.info(f"Created flag: {name} (enabled={enabled})")
        return flag
    
    def is_enabled(self, name: str, user_id: Optional[str] = None) -> bool:
        flag = self.flags.get(name)
        if not flag:
            return False
        
        if not flag.enabled:
            return False
        
        if user_id and flag.allowed_users:
            if user_id in flag.allowed_users:
                return True
            if flag.rollout_percentage < 100:
                return False
        
        if flag.rollout_percentage < 100 and user_id:
            hash_val = int(hashlib.md5(
                f"{name}:{user_id}".encode()
            ).hexdigest(), 16)
            return (hash_val % 100) < flag.rollout_percentage
        
        return flag.rollout_percentage >= 100
    
    def toggle(self, name: str, enabled: bool):
        if name in self.flags:
            self.flags[name].enabled = enabled
            self.flags[name].updated_at = datetime.utcnow().isoformat()
            self._save()
            logger.info(f"Toggled {name}: enabled={enabled}")
    
    def set_rollout(self, name: str, percentage: int):
        if name in self.flags:
            self.flags[name].rollout_percentage = max(0, min(100, percentage))
            self.flags[name].updated_at = datetime.utcnow().isoformat()
            self._save()
            logger.info(f"Set rollout {name}: {percentage}%")
    
    def delete(self, name: str):
        if name in self.flags:
            del self.flags[name]
            self._save()
            logger.info(f"Deleted flag: {name}")
    
    def list_flags(self):
        return [
            {
                "name": f.name,
                "enabled": f.enabled,
                "rollout": f.rollout_percentage,
                "description": f.description,
            }
            for f in self.flags.values()
        ]

# ff = FeatureFlagManager()
# ff.create("new_etl_pipeline", enabled=True, rollout_percentage=25,
#           description="New optimized ETL pipeline")
# ff.create("ml_model_v2", enabled=False, description="ML model version 2")
# 
# if ff.is_enabled("new_etl_pipeline", user_id="user123"):
#     print("Using new pipeline")
# else:
#     print("Using old pipeline")

รวม Prefect กับ Feature Flags

ใช้ Feature Flags ควบคุม Prefect workflows

#!/usr/bin/env python3
# prefect_with_flags.py — Prefect Workflows with Feature Flags
from prefect import flow, task, get_run_logger
from prefect.blocks.system import JSON
import json
import time
from datetime import datetime

# Import feature flag manager
# from feature_flags import FeatureFlagManager

class SimpleFeatureFlags:
    """Inline feature flag for demo"""
    def __init__(self):
        self.flags = {
            "use_new_transformer": {"enabled": True, "rollout": 50},
            "enable_data_validation": {"enabled": True, "rollout": 100},
            "use_gpu_processing": {"enabled": False, "rollout": 0},
            "send_slack_notifications": {"enabled": True, "rollout": 100},
            "enable_caching": {"enabled": True, "rollout": 75},
        }
    
    def is_enabled(self, name, user_id=None):
        flag = self.flags.get(name, {})
        return flag.get("enabled", False)

ff = SimpleFeatureFlags()

@task
def extract(source: str):
    logger = get_run_logger()
    logger.info(f"Extracting from {source}")
    
    data = {"rows": 10000, "source": source, "timestamp": datetime.utcnow().isoformat()}
    return data

@task
def transform_v1(data: dict):
    logger = get_run_logger()
    logger.info("Using transform V1 (legacy)")
    
    data["transform"] = "v1"
    data["rows"] = int(data["rows"] * 0.95)
    time.sleep(0.1)
    return data

@task
def transform_v2(data: dict):
    logger = get_run_logger()
    logger.info("Using transform V2 (optimized)")
    
    data["transform"] = "v2"
    data["rows"] = int(data["rows"] * 0.98)
    data["quality_score"] = 0.95
    time.sleep(0.05)
    return data

@task
def validate_data(data: dict):
    logger = get_run_logger()
    
    checks = {
        "has_rows": data.get("rows", 0) > 0,
        "has_source": bool(data.get("source")),
        "has_timestamp": bool(data.get("timestamp")),
    }
    
    passed = all(checks.values())
    logger.info(f"Validation: {'PASSED' if passed else 'FAILED'} {checks}")
    
    data["validation"] = {"passed": passed, "checks": checks}
    return data

@task
def load(data: dict, target: str):
    logger = get_run_logger()
    logger.info(f"Loading {data['rows']} rows to {target}")
    
    data["loaded_at"] = datetime.utcnow().isoformat()
    data["target"] = target
    return data

@task
def notify_slack(message: str):
    logger = get_run_logger()
    logger.info(f"Slack notification: {message}")

@flow(name="Feature-Flagged ETL")
def flagged_etl_pipeline(source: str, target: str):
    logger = get_run_logger()
    
    # Step 1: Extract
    data = extract(source)
    
    # Step 2: Transform (feature flag decides version)
    if ff.is_enabled("use_new_transformer"):
        logger.info("Feature flag: using new transformer")
        data = transform_v2(data)
    else:
        logger.info("Feature flag: using legacy transformer")
        data = transform_v1(data)
    
    # Step 3: Validate (conditional on feature flag)
    if ff.is_enabled("enable_data_validation"):
        data = validate_data(data)
    else:
        logger.info("Data validation skipped (flag disabled)")
    
    # Step 4: Load
    result = load(data, target)
    
    # Step 5: Notify (conditional)
    if ff.is_enabled("send_slack_notifications"):
        notify_slack(f"ETL complete: {data['rows']} rows loaded to {target}")
    
    return result

@flow(name="Dynamic Config Pipeline")
def dynamic_pipeline(config_name: str = "default"):
    logger = get_run_logger()
    
    configs = {
        "default": {"batch_size": 1000, "parallel": False, "retries": 3},
        "high_performance": {"batch_size": 10000, "parallel": True, "retries": 1},
        "safe_mode": {"batch_size": 100, "parallel": False, "retries": 5},
    }
    
    config = configs.get(config_name, configs["default"])
    logger.info(f"Using config: {config_name} = {config}")
    
    # Pipeline uses config dynamically
    for i in range(0, 50000, config["batch_size"]):
        logger.info(f"Processing batch {i}-{i+config['batch_size']}")
    
    return {"config": config_name, "status": "complete"}

# flagged_etl_pipeline("data/input.csv", "data/output.parquet")
# dynamic_pipeline("high_performance")

Monitoring และ Observability

Monitor workflows และ feature flags

#!/usr/bin/env python3
# monitoring.py — Workflow and Feature Flag Monitoring
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List
from collections import defaultdict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitoring")

class WorkflowMonitor:
    def __init__(self):
        self.runs = []
        self.flag_evaluations = defaultdict(lambda: {"true": 0, "false": 0})
    
    def record_run(self, flow_name, status, duration_ms, metadata=None):
        self.runs.append({
            "flow_name": flow_name,
            "status": status,
            "duration_ms": duration_ms,
            "timestamp": datetime.utcnow().isoformat(),
            "metadata": metadata or {},
        })
    
    def record_flag_evaluation(self, flag_name, result):
        key = "true" if result else "false"
        self.flag_evaluations[flag_name][key] += 1
    
    def get_flow_stats(self, hours=24):
        cutoff = datetime.utcnow() - timedelta(hours=hours)
        recent = [r for r in self.runs]
        
        if not recent:
            return {}
        
        stats = defaultdict(lambda: {
            "total": 0, "success": 0, "failed": 0,
            "durations": [],
        })
        
        for run in recent:
            s = stats[run["flow_name"]]
            s["total"] += 1
            if run["status"] == "success":
                s["success"] += 1
            else:
                s["failed"] += 1
            s["durations"].append(run["duration_ms"])
        
        result = {}
        for name, s in stats.items():
            result[name] = {
                "total_runs": s["total"],
                "success_rate": round(s["success"] / s["total"] * 100, 1),
                "avg_duration_ms": round(sum(s["durations"]) / len(s["durations"]), 0),
                "p95_duration_ms": round(sorted(s["durations"])[int(len(s["durations"]) * 0.95)], 0),
                "failed_runs": s["failed"],
            }
        
        return result
    
    def get_flag_stats(self):
        result = {}
        for flag_name, counts in self.flag_evaluations.items():
            total = counts["true"] + counts["false"]
            result[flag_name] = {
                "total_evaluations": total,
                "true_count": counts["true"],
                "false_count": counts["false"],
                "true_percentage": round(counts["true"] / max(total, 1) * 100, 1),
            }
        return result
    
    def generate_dashboard(self, output="workflow_dashboard.html"):
        flow_stats = self.get_flow_stats()
        flag_stats = self.get_flag_stats()
        
        html = f"""
Prefect Workflow Feature Flag Management — | SiamCafe


Updated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}

Flow Statistics

""" for name, stats in flow_stats.items(): color = "success" if stats["success_rate"] > 95 else "error" html += f"""

{name}

{stats['success_rate']}%

Total: {stats['total_runs']} | Failed: {stats['failed_runs']}

Avg: {stats['avg_duration_ms']}ms | P95: {stats['p95_duration_ms']}ms

""" html += """

Feature Flag Usage

""" for name, stats in flag_stats.items(): html += f""" """ html += "
FlagEvaluationsTrue %TrueFalse
{name}{stats['total_evaluations']} {stats['true_percentage']}% {stats['true_count']} {stats['false_count']}
" from pathlib import Path Path(output).write_text(html) return output # monitor = WorkflowMonitor() # monitor.record_run("ETL Pipeline", "success", 5200) # monitor.record_flag_evaluation("use_new_transformer", True) # monitor.generate_dashboard()

Production Deployment และ Best Practices

แนวทางใช้งาน production

# === Prefect + Feature Flags Production Setup ===

# 1. Prefect Cloud / Self-Hosted
# ===================================
# Option A: Prefect Cloud (managed)
# pip install prefect
# prefect cloud login --key YOUR_API_KEY

# Option B: Self-hosted
# docker-compose.yml
# services:
#   prefect-server:
#     image: prefecthq/prefect:2-latest
#     command: prefect server start --host 0.0.0.0
#     ports: ["4200:4200"]
#     environment:
#       PREFECT_SERVER_DATABASE_CONNECTION_URL: postgresql+asyncpg://user:pass@db:5432/prefect
#   
#   db:
#     image: postgres:16
#     environment:
#       POSTGRES_USER: user
#       POSTGRES_PASSWORD: pass
#       POSTGRES_DB: prefect
#     volumes: ["pgdata:/var/lib/postgresql/data"]
#
# volumes:
#   pgdata:

# 2. Feature Flag Services
# ===================================
# Option A: LaunchDarkly (enterprise)
# pip install launchdarkly-server-sdk
# import ldclient
# ldclient.set_config(ldclient.Config("sdk-key"))
# client = ldclient.get()
# flag_value = client.variation("my-flag", {"key": "user123"}, False)

# Option B: Unleash (open source)
# pip install UnleashClient
# from UnleashClient import UnleashClient
# client = UnleashClient(url="http://unleash:4242/api", app_name="my-app")
# client.initialize_client()
# if client.is_enabled("my-feature"):
#     do_new_thing()

# Option C: Simple Redis-based flags
# import redis
# r = redis.Redis()
# r.set("flag:new_pipeline", "true")
# r.set("flag:new_pipeline:rollout", "50")
# enabled = r.get("flag:new_pipeline") == b"true"

# 3. Best Practices
# ===================================
# Feature Flag Naming:
# - Use descriptive names: "enable_new_data_transformer_v2"
# - Prefix by team: "data_team.new_etl"
# - Prefix by type: "ops.enable_caching", "release.new_ui"

# Lifecycle Management:
# - Set expiration dates for temporary flags
# - Review and clean up old flags monthly
# - Document each flag with description and owner
# - Track flag age and usage

# Testing:
# - Test both enabled and disabled paths
# - Include flag state in test configurations
# - Use deterministic rollout for reproducible tests

# 4. Prefect Deployment Best Practices
# ===================================
# - Use work pools for resource isolation
# - Set appropriate retries and timeouts
# - Use caching for expensive tasks
# - Monitor flow runs with alerts
# - Tag deployments for easy filtering
# - Use parameters for configurable flows

# prefect work-pool create --type process "production-pool"
# prefect deploy --all

# 5. Alerting
# ===================================
# Prefect automations (built-in):
# - Alert on flow run failure
# - Alert on stuck runs (exceeded timeout)
# - Alert on SLA violations

# Custom alerts:
# from prefect.blocks.notifications import SlackWebhook
# slack = SlackWebhook(url="https://hooks.slack.com/...")
# slack.notify("Flow failed!")

echo "Production setup documented"

FAQ คำถามที่พบบ่อย

Q: Prefect กับ Airflow ต่างกันอย่างไร?

A: Prefect ใช้ pure Python decorators (@flow, @task) เขียนง่ายกว่า Airflow ที่ใช้ DAG objects Prefect รองรับ dynamic workflows (สร้าง tasks at runtime) Airflow ต้อง define DAG structure ล่วงหน้า Prefect มี better error handling (retry, caching) built-in Airflow มี ecosystem ใหญ่กว่าและ battle-tested ใน production มากกว่า สำหรับ projects ใหม่ Prefect เรียนรู้ง่ายกว่า สำหรับ enterprise ที่ใช้ Airflow อยู่แล้วไม่จำเป็นต้อง migrate

Q: Feature Flags เพิ่ม technical debt ไหม?

A: ถ้าไม่จัดการดี จะเพิ่ม technical debt มาก ปัญหาที่พบบ่อยคือ stale flags ที่ไม่เคยถูกลบ, nested flags ที่ซับซ้อน, untested flag combinations สำหรับป้องกัน ตั้ง expiration date สำหรับ temporary flags, review flags ทุกเดือน, จำกัดจำนวน active flags (แนะนำไม่เกิน 20-30 flags), ทำ flag cleanup เป็น routine และ document ทุก flag

Q: จะทำ gradual rollout อย่างไร?

A: ใช้ percentage-based rollout เริ่มจาก 5% -> 25% -> 50% -> 100% hash user ID กับ flag name เพื่อให้ user ได้รับ experience เดิมทุกครั้ง (consistent bucketing) monitor metrics ที่แต่ละ stage ก่อนเพิ่ม percentage ถ้ามีปัญหา rollback โดย set percentage เป็น 0 ทันที สำหรับ data pipelines ใช้ percentage ของ data batches แทน users

Q: ใช้ Feature Flag service ไหนดี?

A: สำหรับ enterprise ที่ต้องการ full features LaunchDarkly เป็น leader แต่แพง (~$10/seat/month) สำหรับ open source Unleash ดีที่สุด self-hosted ได้ มี gradual rollout, A/B testing, metrics สำหรับ projects เล็ก ใช้ environment variables หรือ config file เพียงพอ สำหรับ Prefect workflows ที่ต้องการ simple toggle ใช้ Redis-based flags หรือ Prefect Variables ก็เพียงพอ

📖 บทความที่เกี่ยวข้อง

Prometheus Federation Feature Flag Managementอ่านบทความ → Payload CMS Feature Flag Managementอ่านบทความ → Python Pydantic Feature Flag Managementอ่านบทความ → Btrfs Filesystem Feature Flag Managementอ่านบทความ → Prefect Workflow Hexagonal Architectureอ่านบทความ →

📚 ดูบทความทั้งหมด →