ONNX Runtime Event-driven
ONNX Runtime Inference Engine ML Models PyTorch TensorFlow CPU GPU Event-driven Architecture Kafka Message Queue Producer Consumer Async Batch Inference
| Component | Role | Tools |
|---|---|---|
| ONNX Runtime | ML Inference | onnxruntime, GPU, TensorRT |
| Event Broker | Message Distribution | Kafka, RabbitMQ, Redis |
| Producer | Generate Events | API, IoT, Streaming |
| Consumer | Process Events | Python Worker, Go Service |
| Model Store | Model Versioning | MLflow, S3, GCS |
ONNX Model Export และ Inference
# === ONNX Runtime Setup ===
# pip install onnxruntime onnxruntime-gpu onnx torch
# PyTorch Model Export to ONNX
# import torch
# import torch.nn as nn
#
# class ImageClassifier(nn.Module):
# def __init__(self, num_classes=10):
# super().__init__()
# self.features = nn.Sequential(
# nn.Conv2d(3, 32, 3, padding=1),
# nn.ReLU(),
# nn.MaxPool2d(2),
# nn.Conv2d(32, 64, 3, padding=1),
# nn.ReLU(),
# nn.MaxPool2d(2),
# )
# self.classifier = nn.Sequential(
# nn.Linear(64 * 8 * 8, 256),
# nn.ReLU(),
# nn.Linear(256, num_classes),
# )
#
# def forward(self, x):
# x = self.features(x)
# x = x.view(x.size(0), -1)
# return self.classifier(x)
#
# model = ImageClassifier()
# model.eval()
# dummy_input = torch.randn(1, 3, 32, 32)
#
# torch.onnx.export(
# model, dummy_input, "classifier.onnx",
# input_names=["image"],
# output_names=["prediction"],
# dynamic_axes={
# "image": {0: "batch_size"},
# "prediction": {0: "batch_size"},
# },
# opset_version=17,
# )
# ONNX Runtime Inference
# import onnxruntime as ort
# import numpy as np
#
# session = ort.InferenceSession(
# "classifier.onnx",
# providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
# )
#
# input_data = np.random.randn(1, 3, 32, 32).astype(np.float32)
# result = session.run(None, {"image": input_data})
# predictions = result[0]
# class_id = np.argmax(predictions, axis=1)
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class ONNXModel:
name: str
framework: str
input_shape: str
output_shape: str
size_mb: float
inference_ms: float
provider: str
models = [
ONNXModel("ResNet50", "PyTorch", "(1,3,224,224)", "(1,1000)", 97.8, 5.2, "CUDA"),
ONNXModel("BERT-base", "PyTorch", "(1,128)", "(1,128,768)", 438.0, 8.5, "CUDA"),
ONNXModel("YOLOv8n", "PyTorch", "(1,3,640,640)", "(1,84,8400)", 12.2, 3.1, "CUDA"),
ONNXModel("XGBoost", "scikit-learn", "(1,50)", "(1,)", 0.5, 0.1, "CPU"),
ONNXModel("Whisper-small", "PyTorch", "(1,80,3000)", "(1,448)", 967.0, 45.0, "CUDA"),
]
print("=== ONNX Models ===")
for m in models:
print(f" {m.name} ({m.framework}) | {m.size_mb}MB | "
f"{m.inference_ms}ms | {m.provider}")
Event-driven Inference Pipeline
# === Event-driven ML Inference ===
# Kafka Consumer + ONNX Runtime
# from confluent_kafka import Consumer, Producer
# import onnxruntime as ort
# import json
# import numpy as np
#
# # Initialize ONNX Session
# session = ort.InferenceSession("classifier.onnx",
# providers=['CUDAExecutionProvider'])
#
# # Kafka Consumer
# consumer = Consumer({
# 'bootstrap.servers': 'localhost:9092',
# 'group.id': 'ml-inference-group',
# 'auto.offset.reset': 'latest',
# })
# consumer.subscribe(['inference-requests'])
#
# # Kafka Producer for results
# producer = Producer({'bootstrap.servers': 'localhost:9092'})
#
# # Batch buffer
# batch = []
# BATCH_SIZE = 32
# BATCH_TIMEOUT = 0.1 # seconds
#
# while True:
# msg = consumer.poll(BATCH_TIMEOUT)
# if msg and not msg.error():
# data = json.loads(msg.value())
# batch.append(data)
#
# if len(batch) >= BATCH_SIZE or (batch and msg is None):
# # Batch inference
# inputs = np.array([d['features'] for d in batch], dtype=np.float32)
# results = session.run(None, {"image": inputs})
# predictions = results[0]
#
# # Send results
# for i, data in enumerate(batch):
# result = {
# "request_id": data['request_id'],
# "prediction": int(np.argmax(predictions[i])),
# "confidence": float(np.max(predictions[i])),
# }
# producer.produce('inference-results',
# json.dumps(result).encode())
# producer.flush()
# batch.clear()
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class EventType(Enum):
INFERENCE_REQUEST = "inference.request"
INFERENCE_RESULT = "inference.result"
MODEL_UPDATED = "model.updated"
BATCH_READY = "batch.ready"
@dataclass
class InferenceEvent:
event_type: EventType
request_id: str
model_name: str
latency_ms: Optional[float] = None
prediction: Optional[int] = None
confidence: Optional[float] = None
events = [
InferenceEvent(EventType.INFERENCE_REQUEST, "req-001", "ResNet50"),
InferenceEvent(EventType.INFERENCE_RESULT, "req-001", "ResNet50", 5.2, 7, 0.95),
InferenceEvent(EventType.INFERENCE_REQUEST, "req-002", "BERT-base"),
InferenceEvent(EventType.INFERENCE_RESULT, "req-002", "BERT-base", 8.5, 1, 0.88),
InferenceEvent(EventType.MODEL_UPDATED, "sys-001", "ResNet50"),
]
print("\n=== Event Stream ===")
for e in events:
if e.event_type == EventType.INFERENCE_RESULT:
print(f" [{e.event_type.value}] {e.request_id} | "
f"Model: {e.model_name} | Pred: {e.prediction} "
f"({e.confidence:.0%}) | {e.latency_ms}ms")
else:
print(f" [{e.event_type.value}] {e.request_id} | Model: {e.model_name}")
Production Architecture
# === Production Architecture ===
architecture = {
"API Gateway": {
"tools": "FastAPI, Kong, Nginx",
"desc": "รับ Request แปลงเป็น Event ส่งไป Kafka",
},
"Event Broker": {
"tools": "Apache Kafka, Redis Streams",
"desc": "Buffer และ Route Events ไป Consumer Groups",
},
"Inference Workers": {
"tools": "ONNX Runtime + Python/C++",
"desc": "Batch Inference GPU Accelerated Auto-scale",
},
"Model Registry": {
"tools": "MLflow, S3, DVC",
"desc": "Model Versioning A/B Testing Rollback",
},
"Monitoring": {
"tools": "Prometheus, Grafana",
"desc": "Latency P99 Throughput Error Rate Model Drift",
},
}
print("Production ML Architecture:")
for layer, info in architecture.items():
print(f"\n [{layer}]")
print(f" Tools: {info['tools']}")
print(f" {info['desc']}")
# Performance Optimization
optimizations = [
"Batch Inference — รวมหลาย Requests ประมวลผลพร้อมกัน",
"GPU Acceleration — ใช้ CUDAExecutionProvider",
"Model Quantization — INT8 ลดขนาด เร็วขึ้น 2x",
"Graph Optimization — onnxruntime.transformers optimize",
"IO Binding — ลด CPU-GPU Data Transfer",
"Session Options — Thread Pool, Memory Pattern",
"Model Caching — Cache Session ไม่ Load ซ้ำ",
]
print(f"\n\nPerformance Optimizations:")
for i, opt in enumerate(optimizations, 1):
print(f" {i}. {opt}")
เคล็ดลับ
- Batch: รวม Requests เป็น Batch ประมวลผลพร้อมกัน เร็วกว่ามาก
- Quantize: ใช้ INT8 Quantization ลดขนาด Model เร็วขึ้น 2x
- GPU: ใช้ CUDAExecutionProvider สำหรับ Deep Learning Models
- Dynamic Axes: Export ONNX ด้วย Dynamic Batch Size
- Monitor: ติดตาม Latency P99, Throughput, Error Rate
ONNX Runtime คืออะไร
High-performance Inference Engine ONNX Format ทุก Framework CPU GPU TensorRT เร็ว 2-5x Python C++ Java Production
Event-driven Design คืออะไร
Architecture Pattern ตอบสนอง Events Producer Broker Consumer Decouple Scale Kafka EventBridge RabbitMQ Redis Streams
ONNX Export ทำอย่างไร
PyTorch torch.onnx.export TensorFlow tf2onnx scikit-learn skl2onnx Input Shape Dynamic Axes checker Optimize
ใช้ ONNX Runtime กับ Event-driven อย่างไร
Kafka Event Consumer Preprocess ONNX Inference Result Topic Async Batch Inference หลาย Events พร้อมกัน ลด Latency
สรุป
ONNX Runtime Inference Engine Event-driven Kafka Consumer Batch Inference GPU CUDA Quantization Model Export PyTorch TensorFlow Production Architecture Monitoring Latency Throughput
