ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern
ONNX Runtime CQRS Event Sourcing

ONNX Runtime Inference Engine ML Model CQRS Command Query Responsibility Segregation Event Sourcing Event Store Replay Architecture Pattern Production
| Component | หน้าที่ | เทคโนโลยี |
|---|---|---|
| ONNX Runtime | ML Inference | Python C++ JavaScript |
| Command Service | Write Operations | FastAPI + PostgreSQL |
| Query Service | Read Operations | FastAPI + Redis/Elasticsearch |
| Event Store | Event Persistence | EventStoreDB / Kafka |
| Event Bus | Event Distribution | Kafka / RabbitMQ |
ONNX Runtime Inference
=== ONNX Runtime ML Inference ===
pip install onnxruntime onnx torch
Export PyTorch Model to ONNX
import torch
import torch.nn as nn
class FraudDetector(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(20, 64),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid(),
)
def forward(self, x):
return self.layers(x)
model = FraudDetector()
dummy_input = torch.randn(1, 20)
torch.onnx.export(
model, dummy_input, "fraud_detector.onnx",
input_names=["features"],
output_names=["probability"],
dynamic_axes={"features": {0: "batch"}, "probability": {0: "batch"}},
)
ONNX Runtime Inference
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession(
"fraud_detector.onnx",
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
เนื้อหาเกี่ยวข้อง — อ่านต่อ: การ์ดจอ rtx 5090 ราคา
# Single Prediction
features = np.random.randn(1, 20).astype(np.float32)
result = session.run(None, {"features": features})
probability = result[0][0][0]
# Batch Prediction
batch = np.random.randn(100, 20).astype(np.float32)
results = session.run(None, {"features": batch})
from dataclasses import dataclass
from typing import List
แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal
@dataclass
class InferenceBenchmark:
framework: str
model: str
batch_size: int
latency_ms: float
throughput_rps: int
benchmarks = [
InferenceBenchmark("PyTorch", "FraudDetector", 1, 5.2, 192),
InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 1, 1.8, 555),
InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 1, 0.5, 2000),
InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 32, 8.5, 3764),
InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 32, 2.1, 15238),
InferenceBenchmark("TensorRT", "FraudDetector", 32, 1.2, 26666),
]
print("=== Inference Benchmarks ===")
for b in benchmarks:
print(f" [{b.framework}] Batch: {b.batch_size}")
print(f" Latency: {b.latency_ms}ms | Throughput: {b.throughput_rps:,} rps")
CQRS Implementation
=== CQRS Architecture ===
Command Service (Write Side)
from fastapi import FastAPI
from pydantic import BaseModel
import uuid
from datetime import datetime
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน แจ้งเตือนเงินเข้าออกออมสินผ่านไลน์
app = FastAPI()
class CreateOrderCommand(BaseModel):
customer_id: str
items: list
total: float
@app.post("/commands/orders")
async def create_order(cmd: CreateOrderCommand):
# Validate with ML Model (Fraud Check)
features = extract_features(cmd)
fraud_prob = onnx_session.run(None, {"features": features})[0][0][0]
if fraud_prob > 0.8:
raise HTTPException(400, "Suspected fraud")
# Create Event
event = {
"event_type": "OrderCreated",
"aggregate_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"data": cmd.dict(),
}
# Store Event
แนะนำเพิ่มเติม — แหล่งความรู้ Forex iCafeForex
await event_store.append(event)
# Publish to Event Bus
await kafka_producer.send("order-events", event)
return {"order_id": event["aggregate_id"], "status": "created"}
# Query Service (Read Side)
@app.get("/queries/orders/{order_id}")
async def get_order(order_id: str):
# Read from optimized Read Model
order = await redis.get(f"order:{order_id}")
if not order:
order = await elasticsearch.get("orders", order_id)
return order
@dataclass
class CQRSComponent:
side: str
component: str
technology: str
purpose: str
เนื้อหาเกี่ยวข้อง — Qwik Resumability Event Driven Design
components = [
CQRSComponent("Command", "API Gateway", "FastAPI", "รับ Commands จาก Client"),
CQRSComponent("Command", "Fraud Detector", "ONNX Runtime", "ตรวจ Fraud ก่อน Process"),
CQRSComponent("Command", "Event Store", "EventStoreDB", "เก็บ Events ทั้งหมด"),
CQRSComponent("Command", "Event Bus", "Kafka", "กระจาย Events ไปทุก Consumer"),
CQRSComponent("Query", "Read Model", "Redis + ES", "Query ที่ Optimize แล้ว"),
CQRSComponent("Query", "Recommendation", "ONNX Runtime", "แนะนำสินค้า"),
CQRSComponent("Query", "API Gateway", "FastAPI", "ส่งผลลัพธ์ Client"),
]
print("\n=== CQRS Architecture ===")
for c in components:
print(f" [{c.side}] {c.component} ({c.technology})")
print(f" Purpose: {c.purpose}")
Event Sourcing Implementation
=== Event Sourcing ===
Event Store
class Event:

def __init__(self, event_type, aggregate_id, data):
self.event_id = str(uuid.uuid4())
self.event_type = event_type
self.aggregate_id = aggregate_id
self.timestamp = datetime.utcnow()
self.data = data
self.version = 0
class EventStore:
def __init__(self, db_url):
self.engine = create_engine(db_url)
async def append(self, stream_id, events):
async with self.engine.begin() as conn:
for event in events:
await conn.execute(
insert(events_table).values(
event_id=event.event_id,
stream_id=stream_id,
event_type=event.event_type,
data=json.dumps(event.data),
timestamp=event.timestamp,
)
)
async def get_events(self, stream_id, from_version=0):
async with self.engine.begin() as conn:
result = await conn.execute(
select(events_table)
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Betteruptime Zero Downtime Deployment — Deploy
.where(events_table.c.stream_id == stream_id)
.where(events_table.c.version >= from_version)
.order_by(events_table.c.version)
)
return result.fetchall()
@dataclass
class OrderEvent:
event_type: str
timestamp: str
data: str
Order Lifecycle Events
events = [
OrderEvent("OrderCreated", "10:00:00", "customer=C001, items=3, total=2500"),
OrderEvent("FraudChecked", "10:00:01", "score=0.12, result=PASS"),
OrderEvent("PaymentProcessed", "10:00:15", "method=credit_card, amount=2500"),
OrderEvent("InventoryReserved", "10:00:16", "items=[A1, B2, C3], status=reserved"),
OrderEvent("OrderConfirmed", "10:00:17", "confirmation=ORD-2024-001"),
OrderEvent("OrderShipped", "10:02:00", "tracking=TH123456789"),
OrderEvent("OrderDelivered", "11:30:00", "signed_by=John"),
]
print("Event Stream (Order ORD-001):")
for i, e in enumerate(events):
print(f" [{i}] {e.timestamp} | {e.event_type}")
print(f" {e.data}")
Replay to get state
print(f"\n Current State (after replay {len(events)} events):")
print(f" Status: Delivered | Items: 3 | Total: 2,500 THB")
เคล็ดลับ
- ONNX: Export Model เป็น ONNX ลด Latency 2-5x เทียบ Native
- CQRS: แยก Read/Write Scale อิสระ ใช้ Database ที่เหมาะแต่ละด้าน
- Events: Event ต้อง Immutable ไม่แก้ไข เพิ่มอย่างเดียว
- Snapshots: สร้าง Snapshot ทุก N Events ไม่ต้อง Replay ทั้งหมด
- Idempotent: Event Handler ต้อง Idempotent รับ Event ซ้ำได้
ONNX Runtime คืออะไร
High-performance Inference Engine ONNX Format PyTorch TensorFlow เร็วกว่า 2-5x CPU GPU TensorRT Python C++ JavaScript Edge
CQRS คืออะไร
Command Query Responsibility Segregation แยก Read Write Database ต่างกัน Scale อิสระ Write Relational Read NoSQL Cache
Event Sourcing คืออะไร
เก็บ State เป็น Events ไม่ใช่ Current State Replay History Audit Trail Complete ใช้คู่ CQRS Read Model จาก Events
ONNX Runtime กับ CQRS ใช้ร่วมกันอย่างไร
ONNX Query Side Recommendation Fraud Detection Command Side Model Update Event Store Personalization Inference Engine
สรุป
ONNX Runtime Inference CQRS Command Query Event Sourcing Event Store Kafka FastAPI Redis Elasticsearch Fraud Detection Recommendation Replay Snapshot Idempotent Production Architecture





