Technology

AB Testing ML Progressive Delivery — ทดสอบ ML Models ใน Production

A/B Testing ML Progressive Delivery | SiamCafe Blog
2025-06-07· อ. บอม — SiamCafe.net· 1,701 คำ

A/B Testing สำหรับ ML Models คืออะไร

A/B Testing สำหรับ Machine Learning Models เป็นวิธีเปรียบเทียบ performance ของ ML models 2 versions (หรือมากกว่า) ใน production โดยส่ง traffic บางส่วนไปยัง model ใหม่ (challenger) และส่วนที่เหลือไปยัง model เดิม (champion) แล้ววัดผลด้วย metrics ที่กำหนด

Progressive Delivery เป็น deployment strategy ที่ค่อยๆ เพิ่ม traffic ไปยัง model ใหม่ทีละน้อย ตั้งแต่ 1% จนถึง 100% พร้อม automated monitoring และ rollback ถ้าพบปัญหา ต่างจาก traditional deployment ที่ switch 100% ทันที Progressive Delivery ลดความเสี่ยงจาก model ใหม่ที่อาจ perform ไม่ดีใน production

Use cases หลักได้แก่ Model Version Comparison เปรียบเทียบ model v1 กับ v2, Feature Importance Testing ทดสอบว่า features ใหม่ช่วยปรับปรุง predictions ไหม, Algorithm Comparison เปรียบเทียบ algorithms ต่างกัน (เช่น XGBoost vs LightGBM), Hyperparameter Tuning ทดสอบ hyperparameters ต่างกันใน production, Business Metric Validation ตรวจสอบว่า model improvements แปลงเป็น business value จริง

ติดตั้ง A/B Testing Framework

สร้าง A/B testing infrastructure สำหรับ ML

# === A/B Testing Framework Setup ===

# 1. Project Structure
mkdir -p ml-ab-testing/{models, router, analytics, config, tests}

# 2. Traffic Router Configuration
cat > config/experiment.yaml << 'EOF'
experiments:
  - name: "recommendation_model_v2"
    description: "Test new recommendation model with transformer architecture"
    status: "running"
    start_date: "2025-01-15"
    
    variants:
      - name: "control"
        model_id: "rec_model_v1"
        endpoint: "http://model-v1:8080/predict"
        traffic_percentage: 80
      
      - name: "treatment"
        model_id: "rec_model_v2"
        endpoint: "http://model-v2:8080/predict"
        traffic_percentage: 20
    
    metrics:
      primary: "click_through_rate"
      secondary:
        - "conversion_rate"
        - "revenue_per_user"
        - "latency_p99"
      guardrail:
        - name: "latency_p99"
          threshold_ms: 200
        - name: "error_rate"
          threshold_pct: 1.0
    
    progressive_rollout:
      phases:
        - traffic_pct: 5
          duration_hours: 24
          auto_advance: true
        - traffic_pct: 20
          duration_hours: 48
          auto_advance: true
        - traffic_pct: 50
          duration_hours: 72
          auto_advance: false  # Manual approval
        - traffic_pct: 100
          duration_hours: 0
          auto_advance: false
    
    rollback_conditions:
      - metric: "error_rate"
        operator: ">"
        value: 2.0
      - metric: "latency_p99"
        operator: ">"
        value: 300
EOF

# 3. Kubernetes Deployment with Istio Traffic Splitting
cat > k8s/virtual-service.yaml << 'EOF'
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: ml-model-routing
spec:
  hosts:
    - ml-model.default.svc.cluster.local
  http:
    - route:
        - destination:
            host: ml-model-v1
            port:
              number: 8080
          weight: 80
        - destination:
            host: ml-model-v2
            port:
              number: 8080
          weight: 20
      headers:
        response:
          add:
            x-model-version: "%UPSTREAM_METADATA([\"model_version\"])%"
EOF

echo "A/B testing framework configured"

Progressive Delivery สำหรับ ML Models

Implement progressive delivery pipeline

#!/usr/bin/env python3
# progressive_delivery.py — ML Model Progressive Delivery
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("progressive")

class MLProgressiveDelivery:
    def __init__(self, experiment_name):
        self.experiment = experiment_name
        self.phases = [
            {"pct": 5, "duration_hours": 24, "auto": True},
            {"pct": 20, "duration_hours": 48, "auto": True},
            {"pct": 50, "duration_hours": 72, "auto": False},
            {"pct": 100, "duration_hours": 0, "auto": False},
        ]
        self.current_phase = 0
        self.started_at = datetime.utcnow()
        self.phase_started_at = datetime.utcnow()
    
    def get_current_state(self):
        phase = self.phases[self.current_phase]
        elapsed = (datetime.utcnow() - self.phase_started_at).total_seconds() / 3600
        
        return {
            "experiment": self.experiment,
            "current_phase": self.current_phase + 1,
            "total_phases": len(self.phases),
            "traffic_pct": phase["pct"],
            "elapsed_hours": round(elapsed, 1),
            "required_hours": phase["duration_hours"],
            "auto_advance": phase["auto"],
            "ready_to_advance": elapsed >= phase["duration_hours"],
        }
    
    def check_guardrails(self, metrics):
        """Check if guardrail metrics are within thresholds"""
        guardrails = {
            "error_rate_pct": {"threshold": 1.0, "operator": "<="},
            "latency_p99_ms": {"threshold": 200, "operator": "<="},
            "model_accuracy": {"threshold": 0.85, "operator": ">="},
        }
        
        violations = []
        for metric_name, rule in guardrails.items():
            value = metrics.get(metric_name)
            if value is None:
                continue
            
            if rule["operator"] == "<=" and value > rule["threshold"]:
                violations.append({
                    "metric": metric_name,
                    "value": value,
                    "threshold": rule["threshold"],
                    "operator": rule["operator"],
                })
            elif rule["operator"] == ">=" and value < rule["threshold"]:
                violations.append({
                    "metric": metric_name,
                    "value": value,
                    "threshold": rule["threshold"],
                    "operator": rule["operator"],
                })
        
        return {
            "passed": len(violations) == 0,
            "violations": violations,
            "action": "continue" if not violations else "rollback",
        }
    
    def advance_phase(self, metrics):
        """Attempt to advance to next phase"""
        guardrail_check = self.check_guardrails(metrics)
        
        if not guardrail_check["passed"]:
            return {
                "action": "rollback",
                "reason": "Guardrail violations detected",
                "violations": guardrail_check["violations"],
            }
        
        state = self.get_current_state()
        if not state["ready_to_advance"]:
            return {
                "action": "wait",
                "remaining_hours": state["required_hours"] - state["elapsed_hours"],
            }
        
        if self.current_phase < len(self.phases) - 1:
            self.current_phase += 1
            self.phase_started_at = datetime.utcnow()
            new_phase = self.phases[self.current_phase]
            
            return {
                "action": "advanced",
                "new_phase": self.current_phase + 1,
                "new_traffic_pct": new_phase["pct"],
                "requires_approval": not new_phase["auto"],
            }
        
        return {"action": "complete", "message": "Full rollout achieved"}
    
    def rollback(self):
        """Rollback to champion model"""
        return {
            "action": "rollback",
            "from_phase": self.current_phase + 1,
            "traffic_reverted_to": "100% champion (v1)",
            "timestamp": datetime.utcnow().isoformat(),
        }

pd = MLProgressiveDelivery("recommendation_model_v2")
print("State:", json.dumps(pd.get_current_state(), indent=2))

good_metrics = {"error_rate_pct": 0.5, "latency_p99_ms": 150, "model_accuracy": 0.92}
guardrails = pd.check_guardrails(good_metrics)
print("Guardrails:", json.dumps(guardrails, indent=2))

bad_metrics = {"error_rate_pct": 3.0, "latency_p99_ms": 350, "model_accuracy": 0.80}
bad_check = pd.check_guardrails(bad_metrics)
print("Bad Check:", json.dumps(bad_check, indent=2))

Statistical Analysis สำหรับ A/B Tests

วิเคราะห์ผล A/B test ทางสถิติ

#!/usr/bin/env python3
# ab_statistics.py — A/B Test Statistical Analysis
import json
import math
import logging
from typing import Dict, Tuple

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("stats")

class ABTestAnalyzer:
    def __init__(self, confidence_level=0.95):
        self.confidence = confidence_level
        self.z_score = 1.96  # for 95% confidence
    
    def calculate_sample_size(self, baseline_rate, mde, power=0.80):
        """Calculate required sample size per variant"""
        z_alpha = self.z_score
        z_beta = 0.84  # for 80% power
        
        p1 = baseline_rate
        p2 = baseline_rate + mde
        p_avg = (p1 + p2) / 2
        
        numerator = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) + 
                     z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2
        denominator = (p2 - p1) ** 2
        
        n = math.ceil(numerator / denominator)
        
        return {
            "baseline_rate": baseline_rate,
            "minimum_detectable_effect": mde,
            "confidence_level": self.confidence,
            "power": power,
            "sample_size_per_variant": n,
            "total_sample_size": n * 2,
        }
    
    def analyze_proportions(self, control_conversions, control_total,
                           treatment_conversions, treatment_total):
        """Two-proportion z-test for conversion rates"""
        p_control = control_conversions / control_total
        p_treatment = treatment_conversions / treatment_total
        p_pooled = (control_conversions + treatment_conversions) / (control_total + treatment_total)
        
        se = math.sqrt(p_pooled * (1 - p_pooled) * (1/control_total + 1/treatment_total))
        
        if se == 0:
            z_stat = 0
        else:
            z_stat = (p_treatment - p_control) / se
        
        # Two-tailed p-value approximation
        p_value = 2 * (1 - self._normal_cdf(abs(z_stat)))
        
        # Confidence interval for difference
        se_diff = math.sqrt(
            p_control * (1 - p_control) / control_total +
            p_treatment * (1 - p_treatment) / treatment_total
        )
        diff = p_treatment - p_control
        ci_lower = diff - self.z_score * se_diff
        ci_upper = diff + self.z_score * se_diff
        
        relative_lift = (p_treatment - p_control) / p_control * 100 if p_control > 0 else 0
        
        return {
            "control": {
                "conversions": control_conversions,
                "total": control_total,
                "rate": round(p_control, 4),
            },
            "treatment": {
                "conversions": treatment_conversions,
                "total": treatment_total,
                "rate": round(p_treatment, 4),
            },
            "difference": round(diff, 4),
            "relative_lift_pct": round(relative_lift, 2),
            "z_statistic": round(z_stat, 4),
            "p_value": round(p_value, 4),
            "confidence_interval": [round(ci_lower, 4), round(ci_upper, 4)],
            "statistically_significant": p_value < (1 - self.confidence),
            "recommendation": "deploy_treatment" if (p_value < 0.05 and diff > 0) else 
                            "keep_control" if (p_value < 0.05 and diff < 0) else "continue_test",
        }
    
    def _normal_cdf(self, x):
        """Approximation of normal CDF"""
        return 0.5 * (1 + math.erf(x / math.sqrt(2)))
    
    def sequential_test(self, data_points, spending_function="obrien_fleming"):
        """Sequential testing with alpha spending"""
        n = len(data_points)
        max_n = 10000
        info_fraction = n / max_n
        
        if spending_function == "obrien_fleming":
            alpha_spent = 2 * (1 - self._normal_cdf(self.z_score / math.sqrt(info_fraction)))
        else:
            alpha_spent = (1 - self.confidence) * info_fraction
        
        return {
            "current_samples": n,
            "max_samples": max_n,
            "info_fraction": round(info_fraction, 4),
            "alpha_spent": round(alpha_spent, 6),
            "can_stop_early": alpha_spent > 0.01,
        }

analyzer = ABTestAnalyzer(confidence_level=0.95)

sample = analyzer.calculate_sample_size(baseline_rate=0.05, mde=0.005)
print("Sample Size:", json.dumps(sample, indent=2))

result = analyzer.analyze_proportions(
    control_conversions=520, control_total=10000,
    treatment_conversions=580, treatment_total=10000
)
print("Analysis:", json.dumps(result, indent=2))

CI/CD Pipeline สำหรับ ML A/B Testing

Automate ML A/B testing ใน CI/CD

# === CI/CD Pipeline for ML A/B Testing ===

# 1. GitHub Actions — ML Model A/B Deploy
cat > .github/workflows/ml-ab-deploy.yml << 'EOF'
name: ML Model A/B Deploy

on:
  workflow_dispatch:
    inputs:
      model_version:
        description: 'New model version to test'
        required: true
      initial_traffic_pct:
        description: 'Initial traffic percentage (1-100)'
        required: true
        default: '5'

jobs:
  validate-model:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Download Model Artifact
        run: |
          aws s3 cp s3://models/}/model.tar.gz .
          tar xzf model.tar.gz
      
      - name: Run Offline Validation
        run: |
          python3 scripts/validate_model.py \
            --model-path ./model \
            --test-data s3://data/test_set.parquet \
            --min-accuracy 0.85 \
            --max-latency-ms 100

  deploy-canary:
    needs: validate-model
    runs-on: ubuntu-latest
    steps:
      - name: Deploy Model as Canary
        run: |
          kubectl set image deployment/ml-model-v2 \
            model=registry/ml-model:}
          
          kubectl rollout status deployment/ml-model-v2 --timeout=300s
      
      - name: Set Traffic Split
        run: |
          cat < k8s/argo-rollout.yaml << 'EOF'
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
  name: ml-model-rollout
spec:
  replicas: 4
  strategy:
    canary:
      canaryService: ml-model-canary
      stableService: ml-model-stable
      trafficRouting:
        istio:
          virtualService:
            name: ml-model-vs
      steps:
        - setWeight: 5
        - pause: {duration: 24h}
        - analysis:
            templates:
              - templateName: ml-model-analysis
        - setWeight: 20
        - pause: {duration: 48h}
        - analysis:
            templates:
              - templateName: ml-model-analysis
        - setWeight: 50
        - pause: {}  # Manual approval
        - setWeight: 100
  template:
    spec:
      containers:
        - name: model
          image: registry/ml-model:latest
          ports:
            - containerPort: 8080
EOF

echo "CI/CD pipeline configured"

Monitoring และ Decision Making

Monitor A/B test และตัดสินใจ

#!/usr/bin/env python3
# ab_monitor.py — A/B Test Monitoring and Decision
import json
import logging
from datetime import datetime
from typing import Dict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class ABTestMonitor:
    def __init__(self):
        self.experiments = {}
    
    def collect_metrics(self, experiment_name):
        """Collect real-time metrics for both variants"""
        return {
            "experiment": experiment_name,
            "timestamp": datetime.utcnow().isoformat(),
            "control": {
                "requests": 80000,
                "predictions": 79500,
                "errors": 400,
                "avg_latency_ms": 45,
                "p99_latency_ms": 120,
                "accuracy": 0.89,
                "ctr": 0.052,
                "conversion_rate": 0.031,
                "revenue_per_user": 2.45,
            },
            "treatment": {
                "requests": 20000,
                "predictions": 19900,
                "errors": 80,
                "avg_latency_ms": 52,
                "p99_latency_ms": 145,
                "accuracy": 0.92,
                "ctr": 0.058,
                "conversion_rate": 0.035,
                "revenue_per_user": 2.78,
            },
        }
    
    def make_decision(self, metrics):
        """Automated decision engine"""
        control = metrics["control"]
        treatment = metrics["treatment"]
        
        # Guardrail checks
        guardrail_ok = True
        issues = []
        
        if treatment["p99_latency_ms"] > 200:
            guardrail_ok = False
            issues.append(f"Latency too high: {treatment['p99_latency_ms']}ms")
        
        error_rate = treatment["errors"] / max(treatment["requests"], 1) * 100
        if error_rate > 1.0:
            guardrail_ok = False
            issues.append(f"Error rate too high: {error_rate:.2f}%")
        
        if not guardrail_ok:
            return {"decision": "rollback", "reason": issues}
        
        # Performance comparison
        ctr_lift = (treatment["ctr"] - control["ctr"]) / control["ctr"] * 100
        conv_lift = (treatment["conversion_rate"] - control["conversion_rate"]) / control["conversion_rate"] * 100
        rev_lift = (treatment["revenue_per_user"] - control["revenue_per_user"]) / control["revenue_per_user"] * 100
        
        min_samples = 10000
        has_enough_data = treatment["requests"] >= min_samples
        
        return {
            "decision": "advance" if (ctr_lift > 0 and has_enough_data) else "wait",
            "guardrails_passed": guardrail_ok,
            "has_sufficient_data": has_enough_data,
            "lifts": {
                "ctr_lift_pct": round(ctr_lift, 2),
                "conversion_lift_pct": round(conv_lift, 2),
                "revenue_lift_pct": round(rev_lift, 2),
            },
            "treatment_samples": treatment["requests"],
            "min_samples_required": min_samples,
        }

monitor = ABTestMonitor()
metrics = monitor.collect_metrics("recommendation_model_v2")
decision = monitor.make_decision(metrics)

print("Metrics Summary:")
print(f"  Control CTR: {metrics['control']['ctr']}")
print(f"  Treatment CTR: {metrics['treatment']['ctr']}")
print(f"\nDecision: {json.dumps(decision, indent=2)}")

FAQ คำถามที่พบบ่อย

Q: A/B Testing กับ Shadow Testing ต่างกันอย่างไร?

A: A/B Testing ส่ง traffic จริงไปยัง model ใหม่ ผู้ใช้เห็นผลลัพธ์จาก model ใหม่จริง วัด business metrics ได้ (CTR, conversion, revenue) มีความเสี่ยงถ้า model ใหม่ perform ไม่ดี Shadow Testing (Dark Launch) ส่ง traffic ไปทั้ง model เก่าและใหม่พร้อมกัน แต่ผู้ใช้เห็นผลลัพธ์จาก model เก่าเท่านั้น ผลจาก model ใหม่ถูก log ไว้เปรียบเทียบ ไม่มี risk ต่อผู้ใช้ แนะนำทำ Shadow Testing ก่อน แล้วค่อยทำ A/B Test

Q: ต้องมี traffic เท่าไหร่ถึงจะได้ผลที่เชื่อถือได้?

A: ขึ้นกับ baseline conversion rate และ Minimum Detectable Effect (MDE) สูตรคร่าวๆ ถ้า baseline CTR 5% ต้องการตรวจจับ improvement 10% relative (0.5% absolute) ต้องมี sample size ประมาณ 30,000-50,000 per variant ที่ 95% confidence, 80% power ถ้า baseline CTR 1% ต้องมีมากขึ้น 150,000+ per variant ใช้ calculator หรือ function calculate_sample_size ในบทความนี้ อย่า stop test ก่อนถึง required sample size เพราะจะได้ผลที่ไม่ reliable

Q: Progressive Delivery กับ Canary Release ต่างกันไหม?

A: Canary Release เป็นส่วนหนึ่งของ Progressive Delivery Canary Release เน้นการ deploy version ใหม่ให้ subset เล็กๆ ก่อน ตรวจสอบว่าไม่มี errors แล้วค่อย rollout ทั้งหมด Progressive Delivery ครอบคลุมมากกว่า รวม canary, A/B testing, feature flags, automated analysis, automated rollback เป็น framework ทั้งหมดที่จัดการ gradual rollout สำหรับ ML models แนะนำ Progressive Delivery เพราะมี statistical analysis ช่วยตัดสินใจ

Q: เครื่องมือที่แนะนำสำหรับ ML A/B Testing?

A: Argo Rollouts open source progressive delivery controller สำหรับ Kubernetes รองรับ canary, blue-green, A/B testing มี analysis templates, Flagger อีก open source option สำหรับ Kubernetes ทำงานกับ Istio, Linkerd, App Mesh, LaunchDarkly feature flag platform รองรับ ML experiments, Optimizely enterprise A/B testing platform, Statsig ML-focused experimentation platform สำหรับ self-hosted แนะนำ Argo Rollouts + custom analysis scripts สำหรับ managed service แนะนำ Statsig หรือ LaunchDarkly

📖 บทความที่เกี่ยวข้อง

PHP Pest Testing Progressive Deliveryอ่านบทความ → Data Lakehouse Progressive Deliveryอ่านบทความ → Dynatrace OneAgent Progressive Deliveryอ่านบทความ → Elixir Ecto Progressive Deliveryอ่านบทความ → Python FastAPI Progressive Deliveryอ่านบทความ →

📚 ดูบทความทั้งหมด →