ONNX Runtime CQRS Event Sourcing
ONNX Runtime Inference Engine ML Model CQRS Command Query Responsibility Segregation Event Sourcing Event Store Replay Architecture Pattern Production
| Component | หน้าที่ | เทคโนโลยี |
|---|---|---|
| ONNX Runtime | ML Inference | Python C++ JavaScript |
| Command Service | Write Operations | FastAPI + PostgreSQL |
| Query Service | Read Operations | FastAPI + Redis/Elasticsearch |
| Event Store | Event Persistence | EventStoreDB / Kafka |
| Event Bus | Event Distribution | Kafka / RabbitMQ |
ONNX Runtime Inference
# === ONNX Runtime ML Inference ===
# pip install onnxruntime onnx torch
# Export PyTorch Model to ONNX
# import torch
# import torch.nn as nn
#
# class FraudDetector(nn.Module):
# def __init__(self):
# super().__init__()
# self.layers = nn.Sequential(
# nn.Linear(20, 64),
# nn.ReLU(),
# nn.Dropout(0.3),
# nn.Linear(64, 32),
# nn.ReLU(),
# nn.Linear(32, 1),
# nn.Sigmoid(),
# )
# def forward(self, x):
# return self.layers(x)
#
# model = FraudDetector()
# dummy_input = torch.randn(1, 20)
# torch.onnx.export(
# model, dummy_input, "fraud_detector.onnx",
# input_names=["features"],
# output_names=["probability"],
# dynamic_axes={"features": {0: "batch"}, "probability": {0: "batch"}},
# )
# ONNX Runtime Inference
# import onnxruntime as ort
# import numpy as np
#
# session = ort.InferenceSession(
# "fraud_detector.onnx",
# providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
# )
#
# # Single Prediction
# features = np.random.randn(1, 20).astype(np.float32)
# result = session.run(None, {"features": features})
# probability = result[0][0][0]
#
# # Batch Prediction
# batch = np.random.randn(100, 20).astype(np.float32)
# results = session.run(None, {"features": batch})
from dataclasses import dataclass
from typing import List
@dataclass
class InferenceBenchmark:
framework: str
model: str
batch_size: int
latency_ms: float
throughput_rps: int
benchmarks = [
InferenceBenchmark("PyTorch", "FraudDetector", 1, 5.2, 192),
InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 1, 1.8, 555),
InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 1, 0.5, 2000),
InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 32, 8.5, 3764),
InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 32, 2.1, 15238),
InferenceBenchmark("TensorRT", "FraudDetector", 32, 1.2, 26666),
]
print("=== Inference Benchmarks ===")
for b in benchmarks:
print(f" [{b.framework}] Batch: {b.batch_size}")
print(f" Latency: {b.latency_ms}ms | Throughput: {b.throughput_rps:,} rps")
CQRS Implementation
# === CQRS Architecture ===
# Command Service (Write Side)
# from fastapi import FastAPI
# from pydantic import BaseModel
# import uuid
# from datetime import datetime
#
# app = FastAPI()
#
# class CreateOrderCommand(BaseModel):
# customer_id: str
# items: list
# total: float
#
# @app.post("/commands/orders")
# async def create_order(cmd: CreateOrderCommand):
# # Validate with ML Model (Fraud Check)
# features = extract_features(cmd)
# fraud_prob = onnx_session.run(None, {"features": features})[0][0][0]
#
# if fraud_prob > 0.8:
# raise HTTPException(400, "Suspected fraud")
#
# # Create Event
# event = {
# "event_type": "OrderCreated",
# "aggregate_id": str(uuid.uuid4()),
# "timestamp": datetime.utcnow().isoformat(),
# "data": cmd.dict(),
# }
#
# # Store Event
# await event_store.append(event)
#
# # Publish to Event Bus
# await kafka_producer.send("order-events", event)
#
# return {"order_id": event["aggregate_id"], "status": "created"}
#
# # Query Service (Read Side)
# @app.get("/queries/orders/{order_id}")
# async def get_order(order_id: str):
# # Read from optimized Read Model
# order = await redis.get(f"order:{order_id}")
# if not order:
# order = await elasticsearch.get("orders", order_id)
# return order
@dataclass
class CQRSComponent:
side: str
component: str
technology: str
purpose: str
components = [
CQRSComponent("Command", "API Gateway", "FastAPI", "รับ Commands จาก Client"),
CQRSComponent("Command", "Fraud Detector", "ONNX Runtime", "ตรวจ Fraud ก่อน Process"),
CQRSComponent("Command", "Event Store", "EventStoreDB", "เก็บ Events ทั้งหมด"),
CQRSComponent("Command", "Event Bus", "Kafka", "กระจาย Events ไปทุก Consumer"),
CQRSComponent("Query", "Read Model", "Redis + ES", "Query ที่ Optimize แล้ว"),
CQRSComponent("Query", "Recommendation", "ONNX Runtime", "แนะนำสินค้า"),
CQRSComponent("Query", "API Gateway", "FastAPI", "ส่งผลลัพธ์ Client"),
]
print("\n=== CQRS Architecture ===")
for c in components:
print(f" [{c.side}] {c.component} ({c.technology})")
print(f" Purpose: {c.purpose}")
Event Sourcing Implementation
# === Event Sourcing ===
# Event Store
# class Event:
# def __init__(self, event_type, aggregate_id, data):
# self.event_id = str(uuid.uuid4())
# self.event_type = event_type
# self.aggregate_id = aggregate_id
# self.timestamp = datetime.utcnow()
# self.data = data
# self.version = 0
#
# class EventStore:
# def __init__(self, db_url):
# self.engine = create_engine(db_url)
#
# async def append(self, stream_id, events):
# async with self.engine.begin() as conn:
# for event in events:
# await conn.execute(
# insert(events_table).values(
# event_id=event.event_id,
# stream_id=stream_id,
# event_type=event.event_type,
# data=json.dumps(event.data),
# timestamp=event.timestamp,
# )
# )
#
# async def get_events(self, stream_id, from_version=0):
# async with self.engine.begin() as conn:
# result = await conn.execute(
# select(events_table)
# .where(events_table.c.stream_id == stream_id)
# .where(events_table.c.version >= from_version)
# .order_by(events_table.c.version)
# )
# return result.fetchall()
@dataclass
class OrderEvent:
event_type: str
timestamp: str
data: str
# Order Lifecycle Events
events = [
OrderEvent("OrderCreated", "10:00:00", "customer=C001, items=3, total=2500"),
OrderEvent("FraudChecked", "10:00:01", "score=0.12, result=PASS"),
OrderEvent("PaymentProcessed", "10:00:15", "method=credit_card, amount=2500"),
OrderEvent("InventoryReserved", "10:00:16", "items=[A1, B2, C3], status=reserved"),
OrderEvent("OrderConfirmed", "10:00:17", "confirmation=ORD-2024-001"),
OrderEvent("OrderShipped", "10:02:00", "tracking=TH123456789"),
OrderEvent("OrderDelivered", "11:30:00", "signed_by=John"),
]
print("Event Stream (Order ORD-001):")
for i, e in enumerate(events):
print(f" [{i}] {e.timestamp} | {e.event_type}")
print(f" {e.data}")
# Replay to get state
print(f"\n Current State (after replay {len(events)} events):")
print(f" Status: Delivered | Items: 3 | Total: 2,500 THB")
เคล็ดลับ
- ONNX: Export Model เป็น ONNX ลด Latency 2-5x เทียบ Native
- CQRS: แยก Read/Write Scale อิสระ ใช้ Database ที่เหมาะแต่ละด้าน
- Events: Event ต้อง Immutable ไม่แก้ไข เพิ่มอย่างเดียว
- Snapshots: สร้าง Snapshot ทุก N Events ไม่ต้อง Replay ทั้งหมด
- Idempotent: Event Handler ต้อง Idempotent รับ Event ซ้ำได้
ONNX Runtime คืออะไร
High-performance Inference Engine ONNX Format PyTorch TensorFlow เร็วกว่า 2-5x CPU GPU TensorRT Python C++ JavaScript Edge
CQRS คืออะไร
Command Query Responsibility Segregation แยก Read Write Database ต่างกัน Scale อิสระ Write Relational Read NoSQL Cache
Event Sourcing คืออะไร
เก็บ State เป็น Events ไม่ใช่ Current State Replay History Audit Trail Complete ใช้คู่ CQRS Read Model จาก Events
ONNX Runtime กับ CQRS ใช้ร่วมกันอย่างไร
ONNX Query Side Recommendation Fraud Detection Command Side Model Update Event Store Personalization Inference Engine
สรุป
ONNX Runtime Inference CQRS Command Query Event Sourcing Event Store Kafka FastAPI Redis Elasticsearch Fraud Detection Recommendation Replay Snapshot Idempotent Production Architecture
