SiamCafe.net Blog
Technology

ONNX Runtime Event Driven Design

onnx runtime event driven design
ONNX Runtime Event Driven Design | SiamCafe Blog
2025-11-23· อ. บอม — SiamCafe.net· 11,453 คำ

ONNX Runtime Event-driven

ONNX Runtime Inference Engine ML Models PyTorch TensorFlow CPU GPU Event-driven Architecture Kafka Message Queue Producer Consumer Async Batch Inference

ComponentRoleTools
ONNX RuntimeML Inferenceonnxruntime, GPU, TensorRT
Event BrokerMessage DistributionKafka, RabbitMQ, Redis
ProducerGenerate EventsAPI, IoT, Streaming
ConsumerProcess EventsPython Worker, Go Service
Model StoreModel VersioningMLflow, S3, GCS

ONNX Model Export และ Inference

# === ONNX Runtime Setup ===

# pip install onnxruntime onnxruntime-gpu onnx torch

# PyTorch Model Export to ONNX
# import torch
# import torch.nn as nn
#
# class ImageClassifier(nn.Module):
#     def __init__(self, num_classes=10):
#         super().__init__()
#         self.features = nn.Sequential(
#             nn.Conv2d(3, 32, 3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(2),
#             nn.Conv2d(32, 64, 3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(2),
#         )
#         self.classifier = nn.Sequential(
#             nn.Linear(64 * 8 * 8, 256),
#             nn.ReLU(),
#             nn.Linear(256, num_classes),
#         )
#
#     def forward(self, x):
#         x = self.features(x)
#         x = x.view(x.size(0), -1)
#         return self.classifier(x)
#
# model = ImageClassifier()
# model.eval()
# dummy_input = torch.randn(1, 3, 32, 32)
#
# torch.onnx.export(
#     model, dummy_input, "classifier.onnx",
#     input_names=["image"],
#     output_names=["prediction"],
#     dynamic_axes={
#         "image": {0: "batch_size"},
#         "prediction": {0: "batch_size"},
#     },
#     opset_version=17,
# )

# ONNX Runtime Inference
# import onnxruntime as ort
# import numpy as np
#
# session = ort.InferenceSession(
#     "classifier.onnx",
#     providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
# )
#
# input_data = np.random.randn(1, 3, 32, 32).astype(np.float32)
# result = session.run(None, {"image": input_data})
# predictions = result[0]
# class_id = np.argmax(predictions, axis=1)

from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class ONNXModel:
    name: str
    framework: str
    input_shape: str
    output_shape: str
    size_mb: float
    inference_ms: float
    provider: str

models = [
    ONNXModel("ResNet50", "PyTorch", "(1,3,224,224)", "(1,1000)", 97.8, 5.2, "CUDA"),
    ONNXModel("BERT-base", "PyTorch", "(1,128)", "(1,128,768)", 438.0, 8.5, "CUDA"),
    ONNXModel("YOLOv8n", "PyTorch", "(1,3,640,640)", "(1,84,8400)", 12.2, 3.1, "CUDA"),
    ONNXModel("XGBoost", "scikit-learn", "(1,50)", "(1,)", 0.5, 0.1, "CPU"),
    ONNXModel("Whisper-small", "PyTorch", "(1,80,3000)", "(1,448)", 967.0, 45.0, "CUDA"),
]

print("=== ONNX Models ===")
for m in models:
    print(f"  {m.name} ({m.framework}) | {m.size_mb}MB | "
          f"{m.inference_ms}ms | {m.provider}")

Event-driven Inference Pipeline

# === Event-driven ML Inference ===

# Kafka Consumer + ONNX Runtime
# from confluent_kafka import Consumer, Producer
# import onnxruntime as ort
# import json
# import numpy as np
#
# # Initialize ONNX Session
# session = ort.InferenceSession("classifier.onnx",
#     providers=['CUDAExecutionProvider'])
#
# # Kafka Consumer
# consumer = Consumer({
#     'bootstrap.servers': 'localhost:9092',
#     'group.id': 'ml-inference-group',
#     'auto.offset.reset': 'latest',
# })
# consumer.subscribe(['inference-requests'])
#
# # Kafka Producer for results
# producer = Producer({'bootstrap.servers': 'localhost:9092'})
#
# # Batch buffer
# batch = []
# BATCH_SIZE = 32
# BATCH_TIMEOUT = 0.1  # seconds
#
# while True:
#     msg = consumer.poll(BATCH_TIMEOUT)
#     if msg and not msg.error():
#         data = json.loads(msg.value())
#         batch.append(data)
#
#     if len(batch) >= BATCH_SIZE or (batch and msg is None):
#         # Batch inference
#         inputs = np.array([d['features'] for d in batch], dtype=np.float32)
#         results = session.run(None, {"image": inputs})
#         predictions = results[0]
#
#         # Send results
#         for i, data in enumerate(batch):
#             result = {
#                 "request_id": data['request_id'],
#                 "prediction": int(np.argmax(predictions[i])),
#                 "confidence": float(np.max(predictions[i])),
#             }
#             producer.produce('inference-results',
#                              json.dumps(result).encode())
#         producer.flush()
#         batch.clear()

from dataclasses import dataclass
from typing import Optional
from enum import Enum

class EventType(Enum):
    INFERENCE_REQUEST = "inference.request"
    INFERENCE_RESULT = "inference.result"
    MODEL_UPDATED = "model.updated"
    BATCH_READY = "batch.ready"

@dataclass
class InferenceEvent:
    event_type: EventType
    request_id: str
    model_name: str
    latency_ms: Optional[float] = None
    prediction: Optional[int] = None
    confidence: Optional[float] = None

events = [
    InferenceEvent(EventType.INFERENCE_REQUEST, "req-001", "ResNet50"),
    InferenceEvent(EventType.INFERENCE_RESULT, "req-001", "ResNet50", 5.2, 7, 0.95),
    InferenceEvent(EventType.INFERENCE_REQUEST, "req-002", "BERT-base"),
    InferenceEvent(EventType.INFERENCE_RESULT, "req-002", "BERT-base", 8.5, 1, 0.88),
    InferenceEvent(EventType.MODEL_UPDATED, "sys-001", "ResNet50"),
]

print("\n=== Event Stream ===")
for e in events:
    if e.event_type == EventType.INFERENCE_RESULT:
        print(f"  [{e.event_type.value}] {e.request_id} | "
              f"Model: {e.model_name} | Pred: {e.prediction} "
              f"({e.confidence:.0%}) | {e.latency_ms}ms")
    else:
        print(f"  [{e.event_type.value}] {e.request_id} | Model: {e.model_name}")

Production Architecture

# === Production Architecture ===

architecture = {
    "API Gateway": {
        "tools": "FastAPI, Kong, Nginx",
        "desc": "รับ Request แปลงเป็น Event ส่งไป Kafka",
    },
    "Event Broker": {
        "tools": "Apache Kafka, Redis Streams",
        "desc": "Buffer และ Route Events ไป Consumer Groups",
    },
    "Inference Workers": {
        "tools": "ONNX Runtime + Python/C++",
        "desc": "Batch Inference GPU Accelerated Auto-scale",
    },
    "Model Registry": {
        "tools": "MLflow, S3, DVC",
        "desc": "Model Versioning A/B Testing Rollback",
    },
    "Monitoring": {
        "tools": "Prometheus, Grafana",
        "desc": "Latency P99 Throughput Error Rate Model Drift",
    },
}

print("Production ML Architecture:")
for layer, info in architecture.items():
    print(f"\n  [{layer}]")
    print(f"    Tools: {info['tools']}")
    print(f"    {info['desc']}")

# Performance Optimization
optimizations = [
    "Batch Inference — รวมหลาย Requests ประมวลผลพร้อมกัน",
    "GPU Acceleration — ใช้ CUDAExecutionProvider",
    "Model Quantization — INT8 ลดขนาด เร็วขึ้น 2x",
    "Graph Optimization — onnxruntime.transformers optimize",
    "IO Binding — ลด CPU-GPU Data Transfer",
    "Session Options — Thread Pool, Memory Pattern",
    "Model Caching — Cache Session ไม่ Load ซ้ำ",
]

print(f"\n\nPerformance Optimizations:")
for i, opt in enumerate(optimizations, 1):
    print(f"  {i}. {opt}")

เคล็ดลับ

ONNX Runtime คืออะไร

High-performance Inference Engine ONNX Format ทุก Framework CPU GPU TensorRT เร็ว 2-5x Python C++ Java Production

Event-driven Design คืออะไร

Architecture Pattern ตอบสนอง Events Producer Broker Consumer Decouple Scale Kafka EventBridge RabbitMQ Redis Streams

ONNX Export ทำอย่างไร

PyTorch torch.onnx.export TensorFlow tf2onnx scikit-learn skl2onnx Input Shape Dynamic Axes checker Optimize

ใช้ ONNX Runtime กับ Event-driven อย่างไร

Kafka Event Consumer Preprocess ONNX Inference Result Topic Async Batch Inference หลาย Events พร้อมกัน ลด Latency

สรุป

ONNX Runtime Inference Engine Event-driven Kafka Consumer Batch Inference GPU CUDA Quantization Model Export PyTorch TensorFlow Production Architecture Monitoring Latency Throughput

📖 บทความที่เกี่ยวข้อง

HTTP/3 QUIC Event Driven Designอ่านบทความ → Python Pydantic Event Driven Designอ่านบทความ → Healthchecks.io Domain Driven Design DDDอ่านบทความ → ONNX Runtime DNS Managementอ่านบทความ → Data Lakehouse Event Driven Designอ่านบทความ →

📚 ดูบทความทั้งหมด →