SiamCafe.net Blog
Technology

ONNX Runtime CQRS Event Sourcing

onnx runtime cqrs event sourcing
ONNX Runtime CQRS Event Sourcing | SiamCafe Blog
2026-02-27· อ. บอม — SiamCafe.net· 9,504 คำ

ONNX Runtime CQRS Event Sourcing

ONNX Runtime Inference Engine ML Model CQRS Command Query Responsibility Segregation Event Sourcing Event Store Replay Architecture Pattern Production

Componentหน้าที่เทคโนโลยี
ONNX RuntimeML InferencePython C++ JavaScript
Command ServiceWrite OperationsFastAPI + PostgreSQL
Query ServiceRead OperationsFastAPI + Redis/Elasticsearch
Event StoreEvent PersistenceEventStoreDB / Kafka
Event BusEvent DistributionKafka / RabbitMQ

ONNX Runtime Inference

# === ONNX Runtime ML Inference ===

# pip install onnxruntime onnx torch

# Export PyTorch Model to ONNX
# import torch
# import torch.nn as nn
#
# class FraudDetector(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.layers = nn.Sequential(
#             nn.Linear(20, 64),
#             nn.ReLU(),
#             nn.Dropout(0.3),
#             nn.Linear(64, 32),
#             nn.ReLU(),
#             nn.Linear(32, 1),
#             nn.Sigmoid(),
#         )
#     def forward(self, x):
#         return self.layers(x)
#
# model = FraudDetector()
# dummy_input = torch.randn(1, 20)
# torch.onnx.export(
#     model, dummy_input, "fraud_detector.onnx",
#     input_names=["features"],
#     output_names=["probability"],
#     dynamic_axes={"features": {0: "batch"}, "probability": {0: "batch"}},
# )

# ONNX Runtime Inference
# import onnxruntime as ort
# import numpy as np
#
# session = ort.InferenceSession(
#     "fraud_detector.onnx",
#     providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
# )
#
# # Single Prediction
# features = np.random.randn(1, 20).astype(np.float32)
# result = session.run(None, {"features": features})
# probability = result[0][0][0]
#
# # Batch Prediction
# batch = np.random.randn(100, 20).astype(np.float32)
# results = session.run(None, {"features": batch})

from dataclasses import dataclass
from typing import List

@dataclass
class InferenceBenchmark:
    framework: str
    model: str
    batch_size: int
    latency_ms: float
    throughput_rps: int

benchmarks = [
    InferenceBenchmark("PyTorch", "FraudDetector", 1, 5.2, 192),
    InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 1, 1.8, 555),
    InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 1, 0.5, 2000),
    InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 32, 8.5, 3764),
    InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 32, 2.1, 15238),
    InferenceBenchmark("TensorRT", "FraudDetector", 32, 1.2, 26666),
]

print("=== Inference Benchmarks ===")
for b in benchmarks:
    print(f"  [{b.framework}] Batch: {b.batch_size}")
    print(f"    Latency: {b.latency_ms}ms | Throughput: {b.throughput_rps:,} rps")

CQRS Implementation

# === CQRS Architecture ===

# Command Service (Write Side)
# from fastapi import FastAPI
# from pydantic import BaseModel
# import uuid
# from datetime import datetime
#
# app = FastAPI()
#
# class CreateOrderCommand(BaseModel):
#     customer_id: str
#     items: list
#     total: float
#
# @app.post("/commands/orders")
# async def create_order(cmd: CreateOrderCommand):
#     # Validate with ML Model (Fraud Check)
#     features = extract_features(cmd)
#     fraud_prob = onnx_session.run(None, {"features": features})[0][0][0]
#
#     if fraud_prob > 0.8:
#         raise HTTPException(400, "Suspected fraud")
#
#     # Create Event
#     event = {
#         "event_type": "OrderCreated",
#         "aggregate_id": str(uuid.uuid4()),
#         "timestamp": datetime.utcnow().isoformat(),
#         "data": cmd.dict(),
#     }
#
#     # Store Event
#     await event_store.append(event)
#
#     # Publish to Event Bus
#     await kafka_producer.send("order-events", event)
#
#     return {"order_id": event["aggregate_id"], "status": "created"}
#
# # Query Service (Read Side)
# @app.get("/queries/orders/{order_id}")
# async def get_order(order_id: str):
#     # Read from optimized Read Model
#     order = await redis.get(f"order:{order_id}")
#     if not order:
#         order = await elasticsearch.get("orders", order_id)
#     return order

@dataclass
class CQRSComponent:
    side: str
    component: str
    technology: str
    purpose: str

components = [
    CQRSComponent("Command", "API Gateway", "FastAPI", "รับ Commands จาก Client"),
    CQRSComponent("Command", "Fraud Detector", "ONNX Runtime", "ตรวจ Fraud ก่อน Process"),
    CQRSComponent("Command", "Event Store", "EventStoreDB", "เก็บ Events ทั้งหมด"),
    CQRSComponent("Command", "Event Bus", "Kafka", "กระจาย Events ไปทุก Consumer"),
    CQRSComponent("Query", "Read Model", "Redis + ES", "Query ที่ Optimize แล้ว"),
    CQRSComponent("Query", "Recommendation", "ONNX Runtime", "แนะนำสินค้า"),
    CQRSComponent("Query", "API Gateway", "FastAPI", "ส่งผลลัพธ์ Client"),
]

print("\n=== CQRS Architecture ===")
for c in components:
    print(f"  [{c.side}] {c.component} ({c.technology})")
    print(f"    Purpose: {c.purpose}")

Event Sourcing Implementation

# === Event Sourcing ===

# Event Store
# class Event:
#     def __init__(self, event_type, aggregate_id, data):
#         self.event_id = str(uuid.uuid4())
#         self.event_type = event_type
#         self.aggregate_id = aggregate_id
#         self.timestamp = datetime.utcnow()
#         self.data = data
#         self.version = 0
#
# class EventStore:
#     def __init__(self, db_url):
#         self.engine = create_engine(db_url)
#
#     async def append(self, stream_id, events):
#         async with self.engine.begin() as conn:
#             for event in events:
#                 await conn.execute(
#                     insert(events_table).values(
#                         event_id=event.event_id,
#                         stream_id=stream_id,
#                         event_type=event.event_type,
#                         data=json.dumps(event.data),
#                         timestamp=event.timestamp,
#                     )
#                 )
#
#     async def get_events(self, stream_id, from_version=0):
#         async with self.engine.begin() as conn:
#             result = await conn.execute(
#                 select(events_table)
#                 .where(events_table.c.stream_id == stream_id)
#                 .where(events_table.c.version >= from_version)
#                 .order_by(events_table.c.version)
#             )
#             return result.fetchall()

@dataclass
class OrderEvent:
    event_type: str
    timestamp: str
    data: str

# Order Lifecycle Events
events = [
    OrderEvent("OrderCreated", "10:00:00", "customer=C001, items=3, total=2500"),
    OrderEvent("FraudChecked", "10:00:01", "score=0.12, result=PASS"),
    OrderEvent("PaymentProcessed", "10:00:15", "method=credit_card, amount=2500"),
    OrderEvent("InventoryReserved", "10:00:16", "items=[A1, B2, C3], status=reserved"),
    OrderEvent("OrderConfirmed", "10:00:17", "confirmation=ORD-2024-001"),
    OrderEvent("OrderShipped", "10:02:00", "tracking=TH123456789"),
    OrderEvent("OrderDelivered", "11:30:00", "signed_by=John"),
]

print("Event Stream (Order ORD-001):")
for i, e in enumerate(events):
    print(f"  [{i}] {e.timestamp} | {e.event_type}")
    print(f"      {e.data}")

# Replay to get state
print(f"\n  Current State (after replay {len(events)} events):")
print(f"    Status: Delivered | Items: 3 | Total: 2,500 THB")

เคล็ดลับ

ONNX Runtime คืออะไร

High-performance Inference Engine ONNX Format PyTorch TensorFlow เร็วกว่า 2-5x CPU GPU TensorRT Python C++ JavaScript Edge

CQRS คืออะไร

Command Query Responsibility Segregation แยก Read Write Database ต่างกัน Scale อิสระ Write Relational Read NoSQL Cache

Event Sourcing คืออะไร

เก็บ State เป็น Events ไม่ใช่ Current State Replay History Audit Trail Complete ใช้คู่ CQRS Read Model จาก Events

ONNX Runtime กับ CQRS ใช้ร่วมกันอย่างไร

ONNX Query Side Recommendation Fraud Detection Command Side Model Update Event Store Personalization Inference Engine

สรุป

ONNX Runtime Inference CQRS Command Query Event Sourcing Event Store Kafka FastAPI Redis Elasticsearch Fraud Detection Recommendation Replay Snapshot Idempotent Production Architecture

📖 บทความที่เกี่ยวข้อง

MLOps Pipeline CQRS Event Sourcingอ่านบทความ → SASE Security CQRS Event Sourcingอ่านบทความ → Packer Image Builder CQRS Event Sourcingอ่านบทความ → ONNX Runtime DNS Managementอ่านบทความ → Java Micronaut CQRS Event Sourcingอ่านบทความ →

📚 ดูบทความทั้งหมด →