ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern | SiamCafe Blog

ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern | SiamCafe Blog
ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern | SiamCafe Blog

ONNX Runtime CQRS Event Sourcing

ONNX Runtime Inference Engine ML Model CQRS Command Query Responsibility Segregation Event Sourcing Event Store Replay Architecture Pattern Production

Componentหน้าที่เทคโนโลยี
ONNX RuntimeML InferencePython C++ JavaScript
Command ServiceWrite OperationsFastAPI + PostgreSQL
Query ServiceRead OperationsFastAPI + Redis/Elasticsearch
Event StoreEvent PersistenceEventStoreDB / Kafka
Event BusEvent DistributionKafka / RabbitMQ

ONNX Runtime Inference

# === ONNX Runtime ML Inference ===

# pip install onnxruntime onnx torch

# Export PyTorch Model to ONNX
# import torch
# import torch.nn as nn
#
# class FraudDetector(nn.Module):
# def __init__(self):
# super().__init__()
# self.layers = nn.Sequential(
# nn.Linear(20, 64),
# nn.ReLU(),
# nn.Dropout(0.3),
# nn.Linear(64, 32),
# nn.ReLU(),
# nn.Linear(32, 1),
# nn.Sigmoid(),
# )
# def forward(self, x):
# return self.layers(x)
#
# model = FraudDetector()
# dummy_input = torch.randn(1, 20)
# torch.onnx.export(
# model, dummy_input, "fraud_detector.onnx",
# input_names=["features"],
# output_names=["probability"],
# dynamic_axes={"features": {0: "batch"}, "probability": {0: "batch"}},
# )

# ONNX Runtime Inference
# import onnxruntime as ort
# import numpy as np
#
# session = ort.InferenceSession(
# "fraud_detector.onnx",
# providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
# )
#
# # Single Prediction
# features = np.random.randn(1, 20).astype(np.float32)
# result = session.run(None, {"features": features})
# probability = result[0][0][0]
#
# # Batch Prediction
# batch = np.random.randn(100, 20).astype(np.float32)
# results = session.run(None, {"features": batch})

from dataclasses import dataclass
from typing import List

@dataclass
class InferenceBenchmark:
 framework: str
 model: str
 batch_size: int
 latency_ms: float
 throughput_rps: int

benchmarks = [
 InferenceBenchmark("PyTorch", "FraudDetector", 1, 5.2, 192),
 InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 1, 1.8, 555),
 InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 1, 0.5, 2000),
 InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 32, 8.5, 3764),
 InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 32, 2.1, 15238),
 InferenceBenchmark("TensorRT", "FraudDetector", 32, 1.2, 26666),
]

print("=== Inference Benchmarks ===")
for b in benchmarks:
 print(f" [{b.framework}] Batch: {b.batch_size}")
 print(f" Latency: {b.latency_ms}ms | Throughput: {b.throughput_rps:,} rps")

CQRS Implementation

# === CQRS Architecture ===

# Command Service (Write Side)
# from fastapi import FastAPI
# from pydantic import BaseModel
# import uuid
# from datetime import datetime
#
# app = FastAPI()
#
# class CreateOrderCommand(BaseModel):
# customer_id: str
# items: list
# total: float
#
# @app.post("/commands/orders")
# async def create_order(cmd: CreateOrderCommand):
# # Validate with ML Model (Fraud Check)
# features = extract_features(cmd)
# fraud_prob = onnx_session.run(None, {"features": features})[0][0][0]
#
# if fraud_prob > 0.8:
# raise HTTPException(400, "Suspected fraud")
#
# # Create Event
# event = {
# "event_type": "OrderCreated",
# "aggregate_id": str(uuid.uuid4()),
# "timestamp": datetime.utcnow().isoformat(),
# "data": cmd.dict(),
# }
#
# # Store Event
# await event_store.append(event)
#
# # Publish to Event Bus
# await kafka_producer.send("order-events", event)
#
# return {"order_id": event["aggregate_id"], "status": "created"}
#
# # Query Service (Read Side)
# @app.get("/queries/orders/{order_id}")
# async def get_order(order_id: str):
# # Read from optimized Read Model
# order = await redis.get(f"order:{order_id}")
# if not order:
# order = await elasticsearch.get("orders", order_id)
# return order

@dataclass
class CQRSComponent:
 side: str
 component: str
 technology: str
 purpose: str

components = [
 CQRSComponent("Command", "API Gateway", "FastAPI", "รับ Commands จาก Client"),
 CQRSComponent("Command", "Fraud Detector", "ONNX Runtime", "ตรวจ Fraud ก่อน Process"),
 CQRSComponent("Command", "Event Store", "EventStoreDB", "เก็บ Events ทั้งหมด"),
 CQRSComponent("Command", "Event Bus", "Kafka", "กระจาย Events ไปทุก Consumer"),
 CQRSComponent("Query", "Read Model", "Redis + ES", "Query ที่ Optimize แล้ว"),
 CQRSComponent("Query", "Recommendation", "ONNX Runtime", "แนะนำสินค้า"),
 CQRSComponent("Query", "API Gateway", "FastAPI", "ส่งผลลัพธ์ Client"),
]

print("\n=== CQRS Architecture ===")
for c in components:
 print(f" [{c.side}] {c.component} ({c.technology})")
 print(f" Purpose: {c.purpose}")

Event Sourcing Implementation

# === Event Sourcing ===

# Event Store
# class Event:
# def __init__(self, event_type, aggregate_id, data):
# self.event_id = str(uuid.uuid4())
# self.event_type = event_type
# self.aggregate_id = aggregate_id
# self.timestamp = datetime.utcnow()
# self.data = data
# self.version = 0
#
# class EventStore:
# def __init__(self, db_url):
# self.engine = create_engine(db_url)
#
# async def append(self, stream_id, events):
# async with self.engine.begin() as conn:
# for event in events:
# await conn.execute(
# insert(events_table).values(
# event_id=event.event_id,
# stream_id=stream_id,
# event_type=event.event_type,
# data=json.dumps(event.data),
# timestamp=event.timestamp,
# )
# )
#
# async def get_events(self, stream_id, from_version=0):
# async with self.engine.begin() as conn:
# result = await conn.execute(
# select(events_table)
# .where(events_table.c.stream_id == stream_id)
# .where(events_table.c.version >= from_version)
# .order_by(events_table.c.version)
# )
# return result.fetchall()

@dataclass
class OrderEvent:
 event_type: str
 timestamp: str
 data: str

# Order Lifecycle Events
events = [
 OrderEvent("OrderCreated", "10:00:00", "customer=C001, items=3, total=2500"),
 OrderEvent("FraudChecked", "10:00:01", "score=0.12, result=PASS"),
 OrderEvent("PaymentProcessed", "10:00:15", "method=credit_card, amount=2500"),
 OrderEvent("InventoryReserved", "10:00:16", "items=[A1, B2, C3], status=reserved"),
 OrderEvent("OrderConfirmed", "10:00:17", "confirmation=ORD-2024-001"),
 OrderEvent("OrderShipped", "10:02:00", "tracking=TH123456789"),
 OrderEvent("OrderDelivered", "11:30:00", "signed_by=John"),
]

print("Event Stream (Order ORD-001):")
for i, e in enumerate(events):
 print(f" [{i}] {e.timestamp} | {e.event_type}")
 print(f" {e.data}")

# Replay to get state
print(f"\n Current State (after replay {len(events)} events):")
print(f" Status: Delivered | Items: 3 | Total: 2,500 THB")

เคล็ดลับ

  • ONNX: Export Model เป็น ONNX ลด Latency 2-5x เทียบ Native
  • CQRS: แยก Read/Write Scale อิสระ ใช้ Database ที่เหมาะแต่ละด้าน
  • Events: Event ต้อง Immutable ไม่แก้ไข เพิ่มอย่างเดียว
  • Snapshots: สร้าง Snapshot ทุก N Events ไม่ต้อง Replay ทั้งหมด
  • Idempotent: Event Handler ต้อง Idempotent รับ Event ซ้ำได้

ONNX Runtime คืออะไร

High-performance Inference Engine ONNX Format PyTorch TensorFlow เร็วกว่า 2-5x CPU GPU TensorRT Python C++ JavaScript Edge

CQRS คืออะไร

Command Query Responsibility Segregation แยก Read Write Database ต่างกัน Scale อิสระ Write Relational Read NoSQL Cache

Event Sourcing คืออะไร

เก็บ State เป็น Events ไม่ใช่ Current State Replay History Audit Trail Complete ใช้คู่ CQRS Read Model จาก Events

ONNX Runtime กับ CQRS ใช้ร่วมกันอย่างไร

ONNX Query Side Recommendation Fraud Detection Command Side Model Update Event Store Personalization Inference Engine

สรุป

ONNX Runtime Inference CQRS Command Query Event Sourcing Event Store Kafka FastAPI Redis Elasticsearch Fraud Detection Recommendation Replay Snapshot Idempotent Production Architecture