SiamCafe.net Blog
Technology

TensorRT Optimization Pub Sub Architecture

tensorrt optimization pub sub architecture
TensorRT Optimization Pub Sub Architecture | SiamCafe Blog
2026-03-30· อ. บอม — SiamCafe.net· 10,737 คำ

TensorRT Pub/Sub Architecture

TensorRT NVIDIA Inference Optimization Pub/Sub Kafka Triton GPU FP16 INT8 Dynamic Batching Kubernetes Scale Production

OptimizationSpeedupAccuracy ImpactUse Case
FP16 Quantization2x< 0.1% lossทุก Model แนะนำเปิดเสมอ
INT8 Quantization4x0.5-2% lossClassification Detection ที่ยอม Accuracy ลดได้
Layer Fusion1.5-2x0%ทุก Model TensorRT ทำอัตโนมัติ
Dynamic Batching2-8x throughput0%High Traffic API Serving
Response Cache100x (cache hit)0%Repeated Input (Search Recommendation)

TensorRT Engine Build

# === TensorRT Optimization Pipeline ===

# Convert PyTorch → ONNX → TensorRT
# import torch
# import tensorrt as trt
#
# # Step 1: Export PyTorch to ONNX
# model = MyModel()
# model.load_state_dict(torch.load("model.pth"))
# model.eval()
# dummy_input = torch.randn(1, 3, 224, 224).cuda()
# torch.onnx.export(model, dummy_input, "model.onnx",
#     input_names=["input"], output_names=["output"],
#     dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}})
#
# # Step 2: Build TensorRT Engine
# # trtexec --onnx=model.onnx --saveEngine=model.engine \
# #   --fp16 --minShapes=input:1x3x224x224 \
# #   --optShapes=input:8x3x224x224 \
# #   --maxShapes=input:32x3x224x224

from dataclasses import dataclass

@dataclass
class OptimizationStep:
    step: str
    command: str
    speedup: str
    note: str

steps = [
    OptimizationStep("Export to ONNX",
        "torch.onnx.export(model, dummy, 'model.onnx')",
        "N/A (preparation)",
        "ใช้ dynamic_axes สำหรับ Dynamic Batch"),
    OptimizationStep("Build FP16 Engine",
        "trtexec --onnx=model.onnx --fp16 --saveEngine=model_fp16.engine",
        "2x faster than FP32",
        "แนะนำเปิดเสมอ Accuracy แทบไม่ลด"),
    OptimizationStep("Build INT8 Engine",
        "trtexec --onnx=model.onnx --int8 --calib=calibration_data",
        "4x faster than FP32",
        "ต้อง Calibrate ด้วย Representative Data 1000+ samples"),
    OptimizationStep("Dynamic Shape",
        "trtexec --minShapes=input:1x3x224x224 --maxShapes=input:32x3x224x224",
        "Flexible Batch Size",
        "Engine รองรับ Batch 1-32 อัตโนมัติ"),
    OptimizationStep("Benchmark",
        "trtexec --loadEngine=model.engine --batch=8 --iterations=1000",
        "วัด Latency Throughput",
        "ทดสอบก่อน Deploy ดู P99 Latency"),
]

print("=== TensorRT Optimization Steps ===")
for s in steps:
    print(f"  [{s.step}]")
    print(f"    Command: {s.command}")
    print(f"    Speedup: {s.speedup}")
    print(f"    Note: {s.note}")

Pub/Sub Integration

# === Pub/Sub Inference Architecture ===

# Kafka Producer (Client)
# from kafka import KafkaProducer
# producer = KafkaProducer(bootstrap_servers='kafka:9092')
# producer.send('inference-requests', value=json.dumps({
#     "request_id": "uuid-123",
#     "model": "resnet50",
#     "input": base64_image,
#     "callback_topic": "inference-results"
# }).encode())

# Kafka Consumer (TensorRT Worker)
# from kafka import KafkaConsumer
# consumer = KafkaConsumer('inference-requests',
#     group_id='tensorrt-workers', bootstrap_servers='kafka:9092')
# for message in consumer:
#     request = json.loads(message.value)
#     result = tensorrt_infer(request["input"])
#     producer.send(request["callback_topic"], value=json.dumps({
#         "request_id": request["request_id"],
#         "prediction": result
#     }).encode())

@dataclass
class PubSubComponent:
    component: str
    role: str
    technology: str
    scaling: str

architecture = [
    PubSubComponent("API Gateway",
        "รับ Request จาก Client ส่งไป Pub/Sub",
        "FastAPI / Kong / AWS API Gateway",
        "HPA ตาม Request Rate"),
    PubSubComponent("Request Topic",
        "Queue สำหรับ Inference Requests",
        "Kafka Topic (partitions=8)",
        "เพิ่ม Partitions ตาม Throughput"),
    PubSubComponent("TensorRT Workers",
        "Consumer ดึง Request → Infer → Publish Result",
        "Python + TensorRT + Kafka Consumer",
        "Scale ตาม Queue Lag (KEDA)"),
    PubSubComponent("Result Topic",
        "Queue สำหรับ Inference Results",
        "Kafka Topic (partitions=8)",
        "Match กับ Request Topic"),
    PubSubComponent("Result Consumer",
        "ดึง Result ส่งกลับ Client (WebSocket/Callback)",
        "FastAPI WebSocket / Webhook",
        "HPA ตาม Connection Count"),
]

print("=== Pub/Sub Architecture ===")
for c in architecture:
    print(f"  [{c.component}] Role: {c.role}")
    print(f"    Tech: {c.technology}")
    print(f"    Scale: {c.scaling}")

Production Deployment

# === Kubernetes + Triton + Kafka ===

# Triton Inference Server Deployment
# apiVersion: apps/v1
# kind: Deployment
# metadata:
#   name: triton-tensorrt
# spec:
#   replicas: 2
#   template:
#     spec:
#       containers:
#         - name: triton
#           image: nvcr.io/nvidia/tritonserver:24.01-py3
#           args: ["tritonserver", "--model-repository=/models"]
#           resources:
#             limits:
#               nvidia.com/gpu: 1
#               memory: "16Gi"
#           ports:
#             - containerPort: 8000  # HTTP
#             - containerPort: 8001  # gRPC
#             - containerPort: 8002  # Metrics

@dataclass
class DeployMetric:
    metric: str
    target: str
    scale_action: str
    monitor: str

metrics = [
    DeployMetric("GPU Utilization",
        "60-80% (Optimal)",
        "Scale Up Workers ถ้า > 80% Scale Down ถ้า < 40%",
        "nvidia-smi / DCGM Exporter → Prometheus"),
    DeployMetric("Inference Latency P99",
        "< 100ms (Real-time) < 500ms (Batch)",
        "Optimize Model (INT8) หรือ Scale Up GPU",
        "Triton Metrics → Prometheus → Grafana"),
    DeployMetric("Queue Depth (Kafka Lag)",
        "< 100 messages",
        "KEDA Scale Workers ตาม Lag",
        "Kafka Consumer Lag → KEDA"),
    DeployMetric("Throughput (RPS)",
        "ตาม SLA (e.g. 1000 RPS)",
        "เพิ่ม Workers + Kafka Partitions",
        "Triton Metrics + Kafka Metrics"),
    DeployMetric("Error Rate",
        "< 0.1%",
        "ตรวจ Model Version OOM GPU Error",
        "Triton Error Count → Alert"),
]

print("=== Production Metrics ===")
for m in metrics:
    print(f"  [{m.metric}] Target: {m.target}")
    print(f"    Scale: {m.scale_action}")
    print(f"    Monitor: {m.monitor}")

เคล็ดลับ

TensorRT คืออะไร

NVIDIA Inference Optimizer GPU FP16 INT8 Layer Fusion Dynamic Batching ONNX Triton 2-6x Faster T4 A10 A100 H100 Real-time

Pub/Sub Architecture คืออะไร

Publish Subscribe Messaging Kafka Google Pub/Sub Decouple Client Inference Buffer Scale Workers Queue Depth Async Request Result

Optimization ทำอย่างไร

ONNX Export trtexec FP16 2x INT8 4x Calibrate Layer Fusion Dynamic Shape Benchmark Dynamic Batching Cache Warmup Concurrent

Production Deploy อย่างไร

Triton Kubernetes GPU Node KEDA Kafka HPA Prometheus Grafana GPU Utilization Latency P99 Queue Depth Error Rate Blue-Green Canary

สรุป

TensorRT Optimization Pub/Sub Kafka Triton GPU FP16 INT8 Dynamic Batching KEDA Kubernetes Prometheus Scale Production

📖 บทความที่เกี่ยวข้อง

Solid.js Signals Pub Sub Architectureอ่านบทความ → TensorRT Optimization SaaS Architectureอ่านบทความ → TensorRT Optimization Learning Path Roadmapอ่านบทความ → Redis Pub Sub Troubleshooting แก้ปัญหาอ่านบทความ → TensorRT Optimization Data Pipeline ETLอ่านบทความ →

📚 ดูบทความทั้งหมด →