ONNX Runtime Event Driven Design — ออกแบบระบบ
ONNX Runtime Event-driven

ONNX Runtime Inference Engine ML Models PyTorch TensorFlow CPU GPU Event-driven Architecture Kafka Message Queue Producer Consumer Async Batch Inference
| Component | Role | Tools |
|---|---|---|
| ONNX Runtime | ML Inference | onnxruntime, GPU, TensorRT |
| Event Broker | Message Distribution | Kafka, RabbitMQ, Redis |
| Producer | Generate Events | API, IoT, Streaming |
| Consumer | Process Events | Python Worker, Go Service |
| Model Store | Model Versioning | MLflow, S3, GCS |
ONNX Model Export และ Inference
=== ONNX Runtime Setup ===
pip install onnxruntime onnxruntime-gpu onnx torch
PyTorch Model Export to ONNX
import torch
import torch.nn as nn
class ImageClassifier(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 32, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
)
self.classifier = nn.Sequential(
nn.Linear(64 * 8 * 8, 256),
nn.ReLU(),
nn.Linear(256, num_classes),
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
return self.classifier(x)
model = ImageClassifier()
model.eval()
dummy_input = torch.randn(1, 3, 32, 32)
torch.onnx.export(
เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง SQLite Litestream Disaster Recovery Plan
model, dummy_input, "classifier.onnx",
input_names=["image"],
output_names=["prediction"],
dynamic_axes={
"image": {0: "batch_size"},
"prediction": {0: "batch_size"},
},
opset_version=17,
แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook
)
ONNX Runtime Inference
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession(
"classifier.onnx",
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
input_data = np.random.randn(1, 3, 32, 32).astype(np.float32)
result = session.run(None, {"image": input_data})
predictions = result[0]
class_id = np.argmax(predictions, axis=1)
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class ONNXModel:
name: str
framework: str
input_shape: str
output_shape: str
เนื้อหาเกี่ยวข้อง — Ethusd คืออะไร — อัตราแลกเปลี่ยนล่าสุด 2026
size_mb: float
inference_ms: float
provider: str
models = [
ONNXModel("ResNet50", "PyTorch", "(1,3,224,224)", "(1,1000)", 97.8, 5.2, "CUDA"),
ONNXModel("BERT-base", "PyTorch", "(1,128)", "(1,128,768)", 438.0, 8.5, "CUDA"),
ONNXModel("YOLOv8n", "PyTorch", "(1,3,640,640)", "(1,84,8400)", 12.2, 3.1, "CUDA"),
ONNXModel("XGBoost", "scikit-learn", "(1,50)", "(1,)", 0.5, 0.1, "CPU"),
ONNXModel("Whisper-small", "PyTorch", "(1,80,3000)", "(1,448)", 967.0, 45.0, "CUDA"),
]
print("=== ONNX Models ===")
for m in models:
print(f" {m.name} ({m.framework}) | {m.size_mb}MB | "
f"{m.inference_ms}ms | {m.provider}")
Event-driven Inference Pipeline
=== Event-driven ML Inference ===
Kafka Consumer + ONNX Runtime
แนะนำเพิ่มเติม — สัญญาณเทรดรายวัน XM Signal
from confluent_kafka import Consumer, Producer
import onnxruntime as ort
import json
import numpy as np
# Initialize ONNX Session
session = ort.InferenceSession("classifier.onnx",
providers=['CUDAExecutionProvider'])
# Kafka Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'ml-inference-group',
'auto.offset.reset': 'latest',
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: OWASP ZAP Clean Architecture
})
consumer.subscribe(['inference-requests'])
# Kafka Producer for results
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# Batch buffer
batch = []
BATCH_SIZE = 32
BATCH_TIMEOUT = 0.1 # seconds
while True:
msg = consumer.poll(BATCH_TIMEOUT)
if msg and not msg.error():

data = json.loads(msg.value())
batch.append(data)
if len(batch) >= BATCH_SIZE or (batch and msg is None):
# Batch inference
inputs = np.array([d['features'] for d in batch], dtype=np.float32)
results = session.run(None, {"image": inputs})
predictions = results[0]
# Send results
for i, data in enumerate(batch):
result = {
"request_id": data['request_id'],
"prediction": int(np.argmax(predictions[i])),
"confidence": float(np.max(predictions[i])),
}
producer.produce('inference-results',
json.dumps(result).encode())
producer.flush()
batch.clear()
from dataclasses import dataclass
from typing import Optional
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Uptime Kuma Monitoring Clean Architecture
from enum import Enum
class EventType(Enum):
INFERENCE_REQUEST = "inference.request"
INFERENCE_RESULT = "inference.result"
MODEL_UPDATED = "model.updated"
BATCH_READY = "batch.ready"
@dataclass
class InferenceEvent:
event_type: EventType
request_id: str
model_name: str
latency_ms: Optional[float] = None
prediction: Optional[int] = None
confidence: Optional[float] = None
events = [
InferenceEvent(EventType.INFERENCE_REQUEST, "req-001", "ResNet50"),
InferenceEvent(EventType.INFERENCE_RESULT, "req-001", "ResNet50", 5.2, 7, 0.95),
InferenceEvent(EventType.INFERENCE_REQUEST, "req-002", "BERT-base"),
InferenceEvent(EventType.INFERENCE_RESULT, "req-002", "BERT-base", 8.5, 1, 0.88),
InferenceEvent(EventType.MODEL_UPDATED, "sys-001", "ResNet50"),
]
print("\n=== Event Stream ===")
for e in events:
if e.event_type == EventType.INFERENCE_RESULT:
print(f" [{e.event_type.value}] {e.request_id} | "
f"Model: {e.model_name} | Pred: {e.prediction} "
f"({e.confidence:.0%}) | {e.latency_ms}ms")
else:
print(f" [{e.event_type.value}] {e.request_id} | Model: {e.model_name}")
Production Architecture
# === Production Architecture ===
architecture = {
"API Gateway": {
"tools": "FastAPI, Kong, Nginx",
"desc": "รับ Request แปลงเป็น Event ส่งไป Kafka",
},
"Event Broker": {
"tools": "Apache Kafka, Redis Streams",
"desc": "Buffer และ Route Events ไป Consumer Groups",
},
"Inference Workers": {
"tools": "ONNX Runtime + Python/C++",
"desc": "Batch Inference GPU Accelerated Auto-scale",
},
"Model Registry": {
"tools": "MLflow, S3, DVC",
"desc": "Model Versioning A/B Testing Rollback",
},
"Monitoring": {
"tools": "Prometheus, Grafana",
"desc": "Latency P99 Throughput Error Rate Model Drift",
},
}
print("Production ML Architecture:")
for layer, info in architecture.items():
print(f"\n [{layer}]")
print(f" Tools: {info['tools']}")
print(f" {info['desc']}")
# Performance Optimization
optimizations = [
"Batch Inference — รวมหลาย Requests ประมวลผลพร้อมกัน",
"GPU Acceleration — ใช้ CUDAExecutionProvider",
"Model Quantization — INT8 ลดขนาด เร็วขึ้น 2x",
"Graph Optimization — onnxruntime.transformers optimize",
"IO Binding — ลด CPU-GPU Data Transfer",
"Session Options — Thread Pool, Memory Pattern",
"Model Caching — Cache Session ไม่ Load ซ้ำ",
]
print(f"\n\nPerformance Optimizations:")
for i, opt in enumerate(optimizations, 1):
print(f" {i}. {opt}")
เคล็ดลับ
- Batch: รวม Requests เป็น Batch ประมวลผลพร้อมกัน เร็วกว่ามาก
- Quantize: ใช้ INT8 Quantization ลดขนาด Model เร็วขึ้น 2x
- GPU: ใช้ CUDAExecutionProvider สำหรับ Deep Learning Models
- Dynamic Axes: Export ONNX ด้วย Dynamic Batch Size
- Monitor: ติดตาม Latency P99, Throughput, Error Rate
ONNX Runtime คืออะไร
High-performance Inference Engine ONNX Format ทุก Framework CPU GPU TensorRT เร็ว 2-5x Python C++ Java Production
เปิดบัญชีเทรดกับ XM — โบรกที่ อ.บอม ใช้เทรดจริง (พาร์ทเนอร์ XM)





