forex

ONNX Runtime Event Driven Design — ออกแบบระบบ

ONNX Runtime Event Driven Design — ออกแบบระบบ

ONNX Runtime Event-driven

ONNX Runtime Event Driven Design — ออกแบบระบบ

ONNX Runtime Inference Engine ML Models PyTorch TensorFlow CPU GPU Event-driven Architecture Kafka Message Queue Producer Consumer Async Batch Inference

ComponentRoleTools
ONNX RuntimeML Inferenceonnxruntime, GPU, TensorRT
Event BrokerMessage DistributionKafka, RabbitMQ, Redis
ProducerGenerate EventsAPI, IoT, Streaming
ConsumerProcess EventsPython Worker, Go Service
Model StoreModel VersioningMLflow, S3, GCS

ONNX Model Export และ Inference

=== ONNX Runtime Setup ===

pip install onnxruntime onnxruntime-gpu onnx torch

PyTorch Model Export to ONNX

import torch

import torch.nn as nn

class ImageClassifier(nn.Module):

def __init__(self, num_classes=10):

super().__init__()

self.features = nn.Sequential(

nn.Conv2d(3, 32, 3, padding=1),

nn.ReLU(),

nn.MaxPool2d(2),

nn.Conv2d(32, 64, 3, padding=1),

nn.ReLU(),

nn.MaxPool2d(2),

)

self.classifier = nn.Sequential(

nn.Linear(64 * 8 * 8, 256),

nn.ReLU(),

nn.Linear(256, num_classes),

)

def forward(self, x):

x = self.features(x)

x = x.view(x.size(0), -1)

return self.classifier(x)

model = ImageClassifier()

model.eval()

dummy_input = torch.randn(1, 3, 32, 32)

torch.onnx.export(

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง SQLite Litestream Disaster Recovery Plan

model, dummy_input, "classifier.onnx",

input_names=["image"],

output_names=["prediction"],

dynamic_axes={

"image": {0: "batch_size"},

"prediction": {0: "batch_size"},

},

opset_version=17,

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

)

ONNX Runtime Inference

import onnxruntime as ort

import numpy as np

session = ort.InferenceSession(

"classifier.onnx",

providers=['CUDAExecutionProvider', 'CPUExecutionProvider']

)

input_data = np.random.randn(1, 3, 32, 32).astype(np.float32)

result = session.run(None, {"image": input_data})

predictions = result[0]

class_id = np.argmax(predictions, axis=1)

from dataclasses import dataclass, field

from typing import List, Dict

@dataclass

class ONNXModel:

name: str

framework: str

input_shape: str

output_shape: str

เนื้อหาเกี่ยวข้อง — Ethusd คืออะไร — อัตราแลกเปลี่ยนล่าสุด 2026

size_mb: float

inference_ms: float

provider: str

models = [

ONNXModel("ResNet50", "PyTorch", "(1,3,224,224)", "(1,1000)", 97.8, 5.2, "CUDA"),

ONNXModel("BERT-base", "PyTorch", "(1,128)", "(1,128,768)", 438.0, 8.5, "CUDA"),

ONNXModel("YOLOv8n", "PyTorch", "(1,3,640,640)", "(1,84,8400)", 12.2, 3.1, "CUDA"),

ONNXModel("XGBoost", "scikit-learn", "(1,50)", "(1,)", 0.5, 0.1, "CPU"),

ONNXModel("Whisper-small", "PyTorch", "(1,80,3000)", "(1,448)", 967.0, 45.0, "CUDA"),

]

print("=== ONNX Models ===")

for m in models:

print(f" {m.name} ({m.framework}) | {m.size_mb}MB | "

f"{m.inference_ms}ms | {m.provider}")

Event-driven Inference Pipeline

=== Event-driven ML Inference ===

Kafka Consumer + ONNX Runtime

แนะนำเพิ่มเติม — สัญญาณเทรดรายวัน XM Signal

from confluent_kafka import Consumer, Producer

import onnxruntime as ort

import json

import numpy as np

# Initialize ONNX Session

session = ort.InferenceSession("classifier.onnx",

providers=['CUDAExecutionProvider'])

# Kafka Consumer

consumer = Consumer({

'bootstrap.servers': 'localhost:9092',

'group.id': 'ml-inference-group',

'auto.offset.reset': 'latest',

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: OWASP ZAP Clean Architecture

})

consumer.subscribe(['inference-requests'])

# Kafka Producer for results

producer = Producer({'bootstrap.servers': 'localhost:9092'})

# Batch buffer

batch = []

BATCH_SIZE = 32

BATCH_TIMEOUT = 0.1 # seconds

while True:

msg = consumer.poll(BATCH_TIMEOUT)

if msg and not msg.error():

ONNX Runtime Event Driven Design — ออกแบบระบบ

data = json.loads(msg.value())

batch.append(data)

if len(batch) >= BATCH_SIZE or (batch and msg is None):

# Batch inference

inputs = np.array([d['features'] for d in batch], dtype=np.float32)

results = session.run(None, {"image": inputs})

predictions = results[0]

# Send results

for i, data in enumerate(batch):

result = {

"request_id": data['request_id'],

"prediction": int(np.argmax(predictions[i])),

"confidence": float(np.max(predictions[i])),

}

producer.produce('inference-results',

json.dumps(result).encode())

producer.flush()

batch.clear()

from dataclasses import dataclass

from typing import Optional

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Uptime Kuma Monitoring Clean Architecture

from enum import Enum

class EventType(Enum):

INFERENCE_REQUEST = "inference.request"

INFERENCE_RESULT = "inference.result"

MODEL_UPDATED = "model.updated"

BATCH_READY = "batch.ready"

@dataclass

class InferenceEvent:

event_type: EventType

request_id: str

model_name: str

latency_ms: Optional[float] = None

prediction: Optional[int] = None

confidence: Optional[float] = None

events = [

InferenceEvent(EventType.INFERENCE_REQUEST, "req-001", "ResNet50"),

InferenceEvent(EventType.INFERENCE_RESULT, "req-001", "ResNet50", 5.2, 7, 0.95),

InferenceEvent(EventType.INFERENCE_REQUEST, "req-002", "BERT-base"),

InferenceEvent(EventType.INFERENCE_RESULT, "req-002", "BERT-base", 8.5, 1, 0.88),

InferenceEvent(EventType.MODEL_UPDATED, "sys-001", "ResNet50"),

]

print("\n=== Event Stream ===")

for e in events:

if e.event_type == EventType.INFERENCE_RESULT:

print(f" [{e.event_type.value}] {e.request_id} | "

f"Model: {e.model_name} | Pred: {e.prediction} "

f"({e.confidence:.0%}) | {e.latency_ms}ms")

else:

print(f" [{e.event_type.value}] {e.request_id} | Model: {e.model_name}")

Production Architecture

# === Production Architecture ===

architecture = {
    "API Gateway": {
        "tools": "FastAPI, Kong, Nginx",
        "desc": "รับ Request แปลงเป็น Event ส่งไป Kafka",
    },
    "Event Broker": {
        "tools": "Apache Kafka, Redis Streams",
        "desc": "Buffer และ Route Events ไป Consumer Groups",
    },
    "Inference Workers": {
        "tools": "ONNX Runtime + Python/C++",
        "desc": "Batch Inference GPU Accelerated Auto-scale",
    },
    "Model Registry": {
        "tools": "MLflow, S3, DVC",
        "desc": "Model Versioning A/B Testing Rollback",
    },
    "Monitoring": {
        "tools": "Prometheus, Grafana",
        "desc": "Latency P99 Throughput Error Rate Model Drift",
    },
}

print("Production ML Architecture:")
for layer, info in architecture.items():
    print(f"\n  [{layer}]")
    print(f"    Tools: {info['tools']}")
    print(f"    {info['desc']}")

# Performance Optimization
optimizations = [
    "Batch Inference — รวมหลาย Requests ประมวลผลพร้อมกัน",
    "GPU Acceleration — ใช้ CUDAExecutionProvider",
    "Model Quantization — INT8 ลดขนาด เร็วขึ้น 2x",
    "Graph Optimization — onnxruntime.transformers optimize",
    "IO Binding — ลด CPU-GPU Data Transfer",
    "Session Options — Thread Pool, Memory Pattern",
    "Model Caching — Cache Session ไม่ Load ซ้ำ",
]

print(f"\n\nPerformance Optimizations:")
for i, opt in enumerate(optimizations, 1):
    print(f"  {i}. {opt}")

เคล็ดลับ

  • Batch: รวม Requests เป็น Batch ประมวลผลพร้อมกัน เร็วกว่ามาก
  • Quantize: ใช้ INT8 Quantization ลดขนาด Model เร็วขึ้น 2x
  • GPU: ใช้ CUDAExecutionProvider สำหรับ Deep Learning Models
  • Dynamic Axes: Export ONNX ด้วย Dynamic Batch Size
  • Monitor: ติดตาม Latency P99, Throughput, Error Rate

ONNX Runtime คืออะไร

High-performance Inference Engine ONNX Format ทุก Framework CPU GPU TensorRT เร็ว 2-5x Python C++ Java Production

เปิดบัญชีเทรดกับ XM — โบรกที่ อ.บอม ใช้เทรดจริง (พาร์ทเนอร์ XM)

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง