A/B Testing ML + CQRS + Event Sourcing
A/B Testing ML CQRS Event Sourcing Command Query Separation Event Store Projection Model Comparison Production Dashboard
| Component | Role | Technology | Pattern |
|---|---|---|---|
| Experiment Router | Assign User → Model | Custom / LaunchDarkly | CQRS Command |
| Model Service | Serve Prediction | MLflow / BentoML | CQRS Command |
| Event Store | Store All Events | Kafka / EventStoreDB | Event Sourcing |
| Projector | Build Read Models | Flink / Kafka Consumer | CQRS Projection |
| Analytics DB | Query A/B Results | ClickHouse / PostgreSQL | CQRS Query |
| Dashboard | Visualize Results | Grafana / Metabase | CQRS Query |
Event Sourcing Design
# === Event Sourcing for A/B Testing ML ===
from dataclasses import dataclass, field
from datetime import datetime
import json
@dataclass
class Event:
event_type: str
timestamp: str
data: dict
@dataclass
class ExperimentCreated(Event):
event_type: str = "ExperimentCreated"
# data: experiment_id, name, models, traffic_split, start_date
@dataclass
class UserAssigned(Event):
event_type: str = "UserAssigned"
# data: experiment_id, user_id, model_version, assigned_at
@dataclass
class PredictionMade(Event):
event_type: str = "PredictionMade"
# data: experiment_id, user_id, model_version, input, output, latency_ms
@dataclass
class UserInteracted(Event):
event_type: str = "UserInteracted"
# data: experiment_id, user_id, action (click/view/scroll), item_id
@dataclass
class ConversionRecorded(Event):
event_type: str = "ConversionRecorded"
# data: experiment_id, user_id, model_version, conversion_type, amount
# Event Store (simplified)
event_store = []
def append_event(event_type, data):
event = Event(event_type, datetime.now().isoformat(), data)
event_store.append(event)
# In production: publish to Kafka topic
return event
# Example Event Flow
append_event("ExperimentCreated", {
"experiment_id": "exp-001",
"name": "recommendation-model-v2",
"models": {"control": "rec-v1", "treatment": "rec-v2"},
"traffic_split": {"control": 50, "treatment": 50},
})
append_event("UserAssigned", {
"experiment_id": "exp-001",
"user_id": "user-123",
"model_version": "rec-v2",
})
append_event("PredictionMade", {
"experiment_id": "exp-001",
"user_id": "user-123",
"model_version": "rec-v2",
"input": {"user_id": "user-123", "context": "homepage"},
"output": {"items": [101, 205, 310]},
"latency_ms": 45,
})
append_event("ConversionRecorded", {
"experiment_id": "exp-001",
"user_id": "user-123",
"model_version": "rec-v2",
"conversion_type": "purchase",
"amount": 1500,
})
print("=== Event Store ===")
for e in event_store:
print(f" [{e.event_type}] {e.timestamp}")
print(f" Data: {json.dumps(e.data, indent=2)[:100]}...")
CQRS Implementation
# === CQRS: Command & Query Separation ===
# Command Side: Write Events
# POST /api/experiments → ExperimentCreated event
# POST /api/predictions → UserAssigned + PredictionMade events
# POST /api/interactions → UserInteracted event
# POST /api/conversions → ConversionRecorded event
# Query Side: Read Projections
# GET /api/experiments/{id}/results → Aggregated A/B Results
# GET /api/experiments/{id}/metrics → Detailed Metrics per Model
# GET /api/experiments/{id}/timeline → Metric Timeline (Daily)
@dataclass
class ABTestResult:
experiment_id: str
model_version: str
total_users: int
total_predictions: int
total_conversions: int
conversion_rate: float
avg_revenue: float
avg_latency_ms: float
p_value: float
# Projector: Build Read Model from Events
def project_ab_results(events, experiment_id):
models = {}
for e in events:
if e.data.get("experiment_id") != experiment_id:
continue
mv = e.data.get("model_version", "unknown")
if mv not in models:
models[mv] = {"users": set(), "predictions": 0, "conversions": 0, "revenue": 0, "latency": []}
if e.event_type == "UserAssigned":
models[mv]["users"].add(e.data["user_id"])
elif e.event_type == "PredictionMade":
models[mv]["predictions"] += 1
models[mv]["latency"].append(e.data.get("latency_ms", 0))
elif e.event_type == "ConversionRecorded":
models[mv]["conversions"] += 1
models[mv]["revenue"] += e.data.get("amount", 0)
results = []
for mv, data in models.items():
n_users = len(data["users"])
results.append(ABTestResult(
experiment_id, mv, n_users,
data["predictions"], data["conversions"],
data["conversions"] / max(n_users, 1),
data["revenue"] / max(data["conversions"], 1),
sum(data["latency"]) / max(len(data["latency"]), 1),
0.0 # Calculate with scipy.stats
))
return results
print("=== CQRS Projection ===")
results = project_ab_results(event_store, "exp-001")
for r in results:
print(f" [{r.model_version}]")
print(f" Users: {r.total_users} | Predictions: {r.total_predictions}")
print(f" Conversions: {r.total_conversions} | CR: {r.conversion_rate:.2%}")
print(f" Avg Revenue: {r.avg_revenue:,.0f} | Latency: {r.avg_latency_ms:.0f}ms")
Production Monitoring
# === Production Monitoring ===
@dataclass
class MonitorMetric:
metric: str
source: str
target: str
alert: str
metrics = [
MonitorMetric("Conversion Rate per Model",
"Projector → Read DB",
"Treatment CR > Control CR (Significant p<0.05)",
"Treatment CR < Control CR → P2 Review Model"),
MonitorMetric("Prediction Latency per Model",
"Event Store (PredictionMade.latency_ms)",
"P95 < 100ms ทั้ง Control และ Treatment",
"P95 > 200ms → P2 Optimize Model"),
MonitorMetric("Error Rate per Model",
"Application Logs / Events",
"< 0.1% ทั้ง Control และ Treatment",
"> 1% → P1 Stop Experiment Rollback"),
MonitorMetric("Traffic Split Accuracy",
"UserAssigned Events Count",
"50/50 ±2% (หรือตาม Config)",
"Deviation > 5% → P2 Check Router"),
MonitorMetric("Event Store Lag",
"Kafka Consumer Lag",
"< 1000 events lag",
"> 10000 → P2 Scale Consumer"),
MonitorMetric("Sample Size Progress",
"UserAssigned Event Count",
"ถึง Required Sample Size ภายใน Duration",
"< 50% at Halfway → P3 Extend Duration"),
]
print("=== Monitoring ===")
for m in metrics:
print(f" [{m.metric}]")
print(f" Source: {m.source}")
print(f" Target: {m.target}")
print(f" Alert: {m.alert}")
เคล็ดลับ
- Sample Size: คำนวณ Sample Size ก่อนเริ่ม Experiment ไม่ Stop Early
- Guardrail: ตั้ง Guardrail Metrics (Latency Error) หยุดอัตโนมัติถ้าแย่ลง
- Event Replay: ใช้ Event Replay คำนวณ Metric ใหม่ย้อนหลังได้
- Consistent: ใช้ Consistent Hashing ให้ User เจอ Model เดิมตลอด
- Kafka: ใช้ Kafka เป็น Event Store สำหรับ High Throughput
การประยุกต์ใช้ AI ในงานจริง ปี 2026
เทคโนโลยี AI ในปี 2026 ก้าวหน้าไปมากจนสามารถนำไปใช้งานจริงได้หลากหลาย ตั้งแต่ Customer Service ด้วย AI Chatbot ที่เข้าใจบริบทและตอบคำถามได้แม่นยำ Content Generation ที่ช่วยสร้างบทความ รูปภาพ และวิดีโอ ไปจนถึง Predictive Analytics ที่วิเคราะห์ข้อมูลทำนายแนวโน้มธุรกิจ
สำหรับนักพัฒนา การเรียนรู้ AI Framework เป็นสิ่งจำเป็น TensorFlow และ PyTorch ยังคงเป็นตัวเลือกหลัก Hugging Face ทำให้การใช้ Pre-trained Model ง่ายขึ้น LangChain ช่วยสร้าง AI Application ที่ซับซ้อน และ OpenAI API ให้เข้าถึงโมเดลระดับ GPT-4 ได้สะดวก
ข้อควรระวังในการใช้ AI คือ ต้องตรวจสอบผลลัพธ์เสมอเพราะ AI อาจให้ข้อมูลผิดได้ เรื่อง Data Privacy ต้องระวังไม่ส่งข้อมูลลับไปยัง AI Service ภายนอก และเรื่อง Bias ใน AI Model ที่อาจเกิดจากข้อมูลฝึกสอนที่ไม่สมดุล องค์กรควรมี AI Governance Policy กำกับดูแลการใช้งาน
เปรียบเทียบข้อดีและข้อเสีย
จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม
A/B Testing สำหรับ ML คืออะไร
ทดสอบ ML Model 2+ เวอร์ชัน User จริง Control Treatment Traffic Split Conversion CTR Revenue p-value Sample Size
CQRS คืออะไร
Command Query Responsibility Segregation แยก Read Write Command Event Store Query Read Model Projection Scale แยก Denormalized
Event Sourcing คืออะไร
เก็บ State เป็น Events Immutable Audit Trail Replay Projection ExperimentCreated UserAssigned PredictionMade ConversionRecorded
Production Setup ทำอย่างไร
Gateway Router Model Event Store Kafka Projector Flink ClickHouse Grafana Monitoring Latency Error Traffic Split Sample Size
สรุป
A/B Testing ML CQRS Event Sourcing Command Query Event Store Kafka Projection ClickHouse Monitoring Conversion Latency Production