Technology

A/B Testing ML CQRS Event Sourcing

A/B Testing ML CQRS Event Sourcing | SiamCafe Blog
2025-11-18· อ. บอม — SiamCafe.net· 11,900 คำ

A/B Testing ML + CQRS + Event Sourcing

A/B Testing ML CQRS Event Sourcing Command Query Separation Event Store Projection Model Comparison Production Dashboard

ComponentRoleTechnologyPattern
Experiment RouterAssign User → ModelCustom / LaunchDarklyCQRS Command
Model ServiceServe PredictionMLflow / BentoMLCQRS Command
Event StoreStore All EventsKafka / EventStoreDBEvent Sourcing
ProjectorBuild Read ModelsFlink / Kafka ConsumerCQRS Projection
Analytics DBQuery A/B ResultsClickHouse / PostgreSQLCQRS Query
DashboardVisualize ResultsGrafana / MetabaseCQRS Query

Event Sourcing Design

# === Event Sourcing for A/B Testing ML ===

from dataclasses import dataclass, field
from datetime import datetime
import json

@dataclass
class Event:
    event_type: str
    timestamp: str
    data: dict

@dataclass
class ExperimentCreated(Event):
    event_type: str = "ExperimentCreated"
    # data: experiment_id, name, models, traffic_split, start_date

@dataclass
class UserAssigned(Event):
    event_type: str = "UserAssigned"
    # data: experiment_id, user_id, model_version, assigned_at

@dataclass
class PredictionMade(Event):
    event_type: str = "PredictionMade"
    # data: experiment_id, user_id, model_version, input, output, latency_ms

@dataclass
class UserInteracted(Event):
    event_type: str = "UserInteracted"
    # data: experiment_id, user_id, action (click/view/scroll), item_id

@dataclass
class ConversionRecorded(Event):
    event_type: str = "ConversionRecorded"
    # data: experiment_id, user_id, model_version, conversion_type, amount

# Event Store (simplified)
event_store = []

def append_event(event_type, data):
    event = Event(event_type, datetime.now().isoformat(), data)
    event_store.append(event)
    # In production: publish to Kafka topic
    return event

# Example Event Flow
append_event("ExperimentCreated", {
    "experiment_id": "exp-001",
    "name": "recommendation-model-v2",
    "models": {"control": "rec-v1", "treatment": "rec-v2"},
    "traffic_split": {"control": 50, "treatment": 50},
})

append_event("UserAssigned", {
    "experiment_id": "exp-001",
    "user_id": "user-123",
    "model_version": "rec-v2",
})

append_event("PredictionMade", {
    "experiment_id": "exp-001",
    "user_id": "user-123",
    "model_version": "rec-v2",
    "input": {"user_id": "user-123", "context": "homepage"},
    "output": {"items": [101, 205, 310]},
    "latency_ms": 45,
})

append_event("ConversionRecorded", {
    "experiment_id": "exp-001",
    "user_id": "user-123",
    "model_version": "rec-v2",
    "conversion_type": "purchase",
    "amount": 1500,
})

print("=== Event Store ===")
for e in event_store:
    print(f"  [{e.event_type}] {e.timestamp}")
    print(f"    Data: {json.dumps(e.data, indent=2)[:100]}...")

CQRS Implementation

# === CQRS: Command & Query Separation ===

# Command Side: Write Events
# POST /api/experiments       → ExperimentCreated event
# POST /api/predictions       → UserAssigned + PredictionMade events
# POST /api/interactions      → UserInteracted event
# POST /api/conversions       → ConversionRecorded event

# Query Side: Read Projections
# GET /api/experiments/{id}/results    → Aggregated A/B Results
# GET /api/experiments/{id}/metrics    → Detailed Metrics per Model
# GET /api/experiments/{id}/timeline   → Metric Timeline (Daily)

@dataclass
class ABTestResult:
    experiment_id: str
    model_version: str
    total_users: int
    total_predictions: int
    total_conversions: int
    conversion_rate: float
    avg_revenue: float
    avg_latency_ms: float
    p_value: float

# Projector: Build Read Model from Events
def project_ab_results(events, experiment_id):
    models = {}
    for e in events:
        if e.data.get("experiment_id") != experiment_id:
            continue
        mv = e.data.get("model_version", "unknown")
        if mv not in models:
            models[mv] = {"users": set(), "predictions": 0, "conversions": 0, "revenue": 0, "latency": []}
        if e.event_type == "UserAssigned":
            models[mv]["users"].add(e.data["user_id"])
        elif e.event_type == "PredictionMade":
            models[mv]["predictions"] += 1
            models[mv]["latency"].append(e.data.get("latency_ms", 0))
        elif e.event_type == "ConversionRecorded":
            models[mv]["conversions"] += 1
            models[mv]["revenue"] += e.data.get("amount", 0)

    results = []
    for mv, data in models.items():
        n_users = len(data["users"])
        results.append(ABTestResult(
            experiment_id, mv, n_users,
            data["predictions"], data["conversions"],
            data["conversions"] / max(n_users, 1),
            data["revenue"] / max(data["conversions"], 1),
            sum(data["latency"]) / max(len(data["latency"]), 1),
            0.0  # Calculate with scipy.stats
        ))
    return results

print("=== CQRS Projection ===")
results = project_ab_results(event_store, "exp-001")
for r in results:
    print(f"  [{r.model_version}]")
    print(f"    Users: {r.total_users} | Predictions: {r.total_predictions}")
    print(f"    Conversions: {r.total_conversions} | CR: {r.conversion_rate:.2%}")
    print(f"    Avg Revenue: {r.avg_revenue:,.0f} | Latency: {r.avg_latency_ms:.0f}ms")

Production Monitoring

# === Production Monitoring ===

@dataclass
class MonitorMetric:
    metric: str
    source: str
    target: str
    alert: str

metrics = [
    MonitorMetric("Conversion Rate per Model",
        "Projector → Read DB",
        "Treatment CR > Control CR (Significant p<0.05)",
        "Treatment CR < Control CR → P2 Review Model"),
    MonitorMetric("Prediction Latency per Model",
        "Event Store (PredictionMade.latency_ms)",
        "P95 < 100ms ทั้ง Control และ Treatment",
        "P95 > 200ms → P2 Optimize Model"),
    MonitorMetric("Error Rate per Model",
        "Application Logs / Events",
        "< 0.1% ทั้ง Control และ Treatment",
        "> 1% → P1 Stop Experiment Rollback"),
    MonitorMetric("Traffic Split Accuracy",
        "UserAssigned Events Count",
        "50/50 ±2% (หรือตาม Config)",
        "Deviation > 5% → P2 Check Router"),
    MonitorMetric("Event Store Lag",
        "Kafka Consumer Lag",
        "< 1000 events lag",
        "> 10000 → P2 Scale Consumer"),
    MonitorMetric("Sample Size Progress",
        "UserAssigned Event Count",
        "ถึง Required Sample Size ภายใน Duration",
        "< 50% at Halfway → P3 Extend Duration"),
]

print("=== Monitoring ===")
for m in metrics:
    print(f"  [{m.metric}]")
    print(f"    Source: {m.source}")
    print(f"    Target: {m.target}")
    print(f"    Alert: {m.alert}")

เคล็ดลับ

การประยุกต์ใช้ AI ในงานจริง ปี 2026

เทคโนโลยี AI ในปี 2026 ก้าวหน้าไปมากจนสามารถนำไปใช้งานจริงได้หลากหลาย ตั้งแต่ Customer Service ด้วย AI Chatbot ที่เข้าใจบริบทและตอบคำถามได้แม่นยำ Content Generation ที่ช่วยสร้างบทความ รูปภาพ และวิดีโอ ไปจนถึง Predictive Analytics ที่วิเคราะห์ข้อมูลทำนายแนวโน้มธุรกิจ

สำหรับนักพัฒนา การเรียนรู้ AI Framework เป็นสิ่งจำเป็น TensorFlow และ PyTorch ยังคงเป็นตัวเลือกหลัก Hugging Face ทำให้การใช้ Pre-trained Model ง่ายขึ้น LangChain ช่วยสร้าง AI Application ที่ซับซ้อน และ OpenAI API ให้เข้าถึงโมเดลระดับ GPT-4 ได้สะดวก

ข้อควรระวังในการใช้ AI คือ ต้องตรวจสอบผลลัพธ์เสมอเพราะ AI อาจให้ข้อมูลผิดได้ เรื่อง Data Privacy ต้องระวังไม่ส่งข้อมูลลับไปยัง AI Service ภายนอก และเรื่อง Bias ใน AI Model ที่อาจเกิดจากข้อมูลฝึกสอนที่ไม่สมดุล องค์กรควรมี AI Governance Policy กำกับดูแลการใช้งาน

เปรียบเทียบข้อดีและข้อเสีย

ข้อดีข้อเสีย
ประสิทธิภาพสูง ทำงานได้เร็วและแม่นยำ ลดเวลาทำงานซ้ำซ้อนต้องใช้เวลาเรียนรู้เบื้องต้นพอสมควร มี Learning Curve สูง
มี Community ขนาดใหญ่ มีคนช่วยเหลือและแหล่งเรียนรู้มากมายบางฟีเจอร์อาจยังไม่เสถียร หรือมีการเปลี่ยนแปลงบ่อยในเวอร์ชันใหม่
รองรับ Integration กับเครื่องมือและบริการอื่นได้หลากหลายต้นทุนอาจสูงสำหรับ Enterprise License หรือ Cloud Service
เป็น Open Source หรือมีเวอร์ชันฟรีให้เริ่มต้นใช้งานต้องการ Hardware หรือ Infrastructure ที่เพียงพอ

จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม

A/B Testing สำหรับ ML คืออะไร

ทดสอบ ML Model 2+ เวอร์ชัน User จริง Control Treatment Traffic Split Conversion CTR Revenue p-value Sample Size

CQRS คืออะไร

Command Query Responsibility Segregation แยก Read Write Command Event Store Query Read Model Projection Scale แยก Denormalized

Event Sourcing คืออะไร

เก็บ State เป็น Events Immutable Audit Trail Replay Projection ExperimentCreated UserAssigned PredictionMade ConversionRecorded

Production Setup ทำอย่างไร

Gateway Router Model Event Store Kafka Projector Flink ClickHouse Grafana Monitoring Latency Error Traffic Split Sample Size

สรุป

A/B Testing ML CQRS Event Sourcing Command Query Event Store Kafka Projection ClickHouse Monitoring Conversion Latency Production

📖 บทความที่เกี่ยวข้อง

PHP Pest Testing CQRS Event Sourcingอ่านบทความ → PlanetScale Vitess CQRS Event Sourcingอ่านบทความ → CircleCI Orbs CQRS Event Sourcingอ่านบทความ → SD-WAN Architecture CQRS Event Sourcingอ่านบทความ → JavaScript Bun Runtime CQRS Event Sourcingอ่านบทความ →

📚 ดูบทความทั้งหมด →