ai

ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern

ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern

ONNX Runtime CQRS Event Sourcing

ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern

ONNX Runtime Inference Engine ML Model CQRS Command Query Responsibility Segregation Event Sourcing Event Store Replay Architecture Pattern Production

Componentหน้าที่เทคโนโลยี
ONNX RuntimeML InferencePython C++ JavaScript
Command ServiceWrite OperationsFastAPI + PostgreSQL
Query ServiceRead OperationsFastAPI + Redis/Elasticsearch
Event StoreEvent PersistenceEventStoreDB / Kafka
Event BusEvent DistributionKafka / RabbitMQ

ONNX Runtime Inference

=== ONNX Runtime ML Inference ===

pip install onnxruntime onnx torch

Export PyTorch Model to ONNX

import torch

import torch.nn as nn

class FraudDetector(nn.Module):

def __init__(self):

super().__init__()

self.layers = nn.Sequential(

nn.Linear(20, 64),

nn.ReLU(),

nn.Dropout(0.3),

nn.Linear(64, 32),

nn.ReLU(),

nn.Linear(32, 1),

nn.Sigmoid(),

)

def forward(self, x):

return self.layers(x)

model = FraudDetector()

dummy_input = torch.randn(1, 20)

torch.onnx.export(

model, dummy_input, "fraud_detector.onnx",

input_names=["features"],

output_names=["probability"],

dynamic_axes={"features": {0: "batch"}, "probability": {0: "batch"}},

)

ONNX Runtime Inference

import onnxruntime as ort

import numpy as np

session = ort.InferenceSession(

"fraud_detector.onnx",

providers=["CUDAExecutionProvider", "CPUExecutionProvider"],

)

เนื้อหาเกี่ยวข้อง — อ่านต่อ: การ์ดจอ rtx 5090 ราคา

# Single Prediction

features = np.random.randn(1, 20).astype(np.float32)

result = session.run(None, {"features": features})

probability = result[0][0][0]

# Batch Prediction

batch = np.random.randn(100, 20).astype(np.float32)

results = session.run(None, {"features": batch})

from dataclasses import dataclass

from typing import List

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

@dataclass

class InferenceBenchmark:

framework: str

model: str

batch_size: int

latency_ms: float

throughput_rps: int

benchmarks = [

InferenceBenchmark("PyTorch", "FraudDetector", 1, 5.2, 192),

InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 1, 1.8, 555),

InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 1, 0.5, 2000),

InferenceBenchmark("ONNX Runtime CPU", "FraudDetector", 32, 8.5, 3764),

InferenceBenchmark("ONNX Runtime GPU", "FraudDetector", 32, 2.1, 15238),

InferenceBenchmark("TensorRT", "FraudDetector", 32, 1.2, 26666),

]

print("=== Inference Benchmarks ===")

for b in benchmarks:

print(f" [{b.framework}] Batch: {b.batch_size}")

print(f" Latency: {b.latency_ms}ms | Throughput: {b.throughput_rps:,} rps")

CQRS Implementation

=== CQRS Architecture ===

Command Service (Write Side)

from fastapi import FastAPI

from pydantic import BaseModel

import uuid

from datetime import datetime

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน แจ้งเตือนเงินเข้าออกออมสินผ่านไลน์

app = FastAPI()

class CreateOrderCommand(BaseModel):

customer_id: str

items: list

total: float

@app.post("/commands/orders")

async def create_order(cmd: CreateOrderCommand):

# Validate with ML Model (Fraud Check)

features = extract_features(cmd)

fraud_prob = onnx_session.run(None, {"features": features})[0][0][0]

if fraud_prob > 0.8:

raise HTTPException(400, "Suspected fraud")

# Create Event

event = {

"event_type": "OrderCreated",

"aggregate_id": str(uuid.uuid4()),

"timestamp": datetime.utcnow().isoformat(),

"data": cmd.dict(),

}

# Store Event

แนะนำเพิ่มเติม — แหล่งความรู้ Forex iCafeForex

await event_store.append(event)

# Publish to Event Bus

await kafka_producer.send("order-events", event)

return {"order_id": event["aggregate_id"], "status": "created"}

# Query Service (Read Side)

@app.get("/queries/orders/{order_id}")

async def get_order(order_id: str):

# Read from optimized Read Model

order = await redis.get(f"order:{order_id}")

if not order:

order = await elasticsearch.get("orders", order_id)

return order

@dataclass

class CQRSComponent:

side: str

component: str

technology: str

purpose: str

เนื้อหาเกี่ยวข้อง — Qwik Resumability Event Driven Design

components = [

CQRSComponent("Command", "API Gateway", "FastAPI", "รับ Commands จาก Client"),

CQRSComponent("Command", "Fraud Detector", "ONNX Runtime", "ตรวจ Fraud ก่อน Process"),

CQRSComponent("Command", "Event Store", "EventStoreDB", "เก็บ Events ทั้งหมด"),

CQRSComponent("Command", "Event Bus", "Kafka", "กระจาย Events ไปทุก Consumer"),

CQRSComponent("Query", "Read Model", "Redis + ES", "Query ที่ Optimize แล้ว"),

CQRSComponent("Query", "Recommendation", "ONNX Runtime", "แนะนำสินค้า"),

CQRSComponent("Query", "API Gateway", "FastAPI", "ส่งผลลัพธ์ Client"),

]

print("\n=== CQRS Architecture ===")

for c in components:

print(f" [{c.side}] {c.component} ({c.technology})")

print(f" Purpose: {c.purpose}")

Event Sourcing Implementation

=== Event Sourcing ===

Event Store

class Event:

ONNX Runtime CQRS Event Sourcing — Inference Engine และ Architecture Pattern

def __init__(self, event_type, aggregate_id, data):

self.event_id = str(uuid.uuid4())

self.event_type = event_type

self.aggregate_id = aggregate_id

self.timestamp = datetime.utcnow()

self.data = data

self.version = 0

class EventStore:

def __init__(self, db_url):

self.engine = create_engine(db_url)

async def append(self, stream_id, events):

async with self.engine.begin() as conn:

for event in events:

await conn.execute(

insert(events_table).values(

event_id=event.event_id,

stream_id=stream_id,

event_type=event.event_type,

data=json.dumps(event.data),

timestamp=event.timestamp,

)

)

async def get_events(self, stream_id, from_version=0):

async with self.engine.begin() as conn:

result = await conn.execute(

select(events_table)

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Betteruptime Zero Downtime Deployment — Deploy

.where(events_table.c.stream_id == stream_id)

.where(events_table.c.version >= from_version)

.order_by(events_table.c.version)

)

return result.fetchall()

@dataclass

class OrderEvent:

event_type: str

timestamp: str

data: str

Order Lifecycle Events

events = [

OrderEvent("OrderCreated", "10:00:00", "customer=C001, items=3, total=2500"),

OrderEvent("FraudChecked", "10:00:01", "score=0.12, result=PASS"),

OrderEvent("PaymentProcessed", "10:00:15", "method=credit_card, amount=2500"),

OrderEvent("InventoryReserved", "10:00:16", "items=[A1, B2, C3], status=reserved"),

OrderEvent("OrderConfirmed", "10:00:17", "confirmation=ORD-2024-001"),

OrderEvent("OrderShipped", "10:02:00", "tracking=TH123456789"),

OrderEvent("OrderDelivered", "11:30:00", "signed_by=John"),

]

print("Event Stream (Order ORD-001):")

for i, e in enumerate(events):

print(f" [{i}] {e.timestamp} | {e.event_type}")

print(f" {e.data}")

Replay to get state

print(f"\n Current State (after replay {len(events)} events):")

print(f" Status: Delivered | Items: 3 | Total: 2,500 THB")

เคล็ดลับ

  • ONNX: Export Model เป็น ONNX ลด Latency 2-5x เทียบ Native
  • CQRS: แยก Read/Write Scale อิสระ ใช้ Database ที่เหมาะแต่ละด้าน
  • Events: Event ต้อง Immutable ไม่แก้ไข เพิ่มอย่างเดียว
  • Snapshots: สร้าง Snapshot ทุก N Events ไม่ต้อง Replay ทั้งหมด
  • Idempotent: Event Handler ต้อง Idempotent รับ Event ซ้ำได้

ONNX Runtime คืออะไร

High-performance Inference Engine ONNX Format PyTorch TensorFlow เร็วกว่า 2-5x CPU GPU TensorRT Python C++ JavaScript Edge

CQRS คืออะไร

Command Query Responsibility Segregation แยก Read Write Database ต่างกัน Scale อิสระ Write Relational Read NoSQL Cache

Event Sourcing คืออะไร

เก็บ State เป็น Events ไม่ใช่ Current State Replay History Audit Trail Complete ใช้คู่ CQRS Read Model จาก Events

ONNX Runtime กับ CQRS ใช้ร่วมกันอย่างไร

ONNX Query Side Recommendation Fraud Detection Command Side Model Update Event Store Personalization Inference Engine

สรุป

ONNX Runtime Inference CQRS Command Query Event Sourcing Event Store Kafka FastAPI Redis Elasticsearch Fraud Detection Recommendation Replay Snapshot Idempotent Production Architecture

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง