SiamCafe.net Blog
Technology

Stable Diffusion ComfyUI Stream Processing

stable diffusion comfyui stream processing
Stable Diffusion ComfyUI Stream Processing | SiamCafe Blog
2026-04-26· อ. บอม — SiamCafe.net· 10,059 คำ

ComfyUI Stream Processing

Stable Diffusion ComfyUI Stream Processing Queue Redis Kafka GPU Worker Real-time Generation Scaling KEDA Kubernetes WebSocket Automation Pipeline

ArchitectureQueueLatencyScaleเหมาะกับ
Direct APIไม่มีต่ำ1 GPUDevelopment
Redis QueueRedis RQต่ำ1-10 GPUSmall-Medium
CeleryRedis/RabbitMQปานกลาง1-50 GPUMedium
Kafka PipelineKafkaปานกลาง10-100+ GPULarge Scale

Queue-based Architecture

# === ComfyUI Queue Architecture ===

# Redis Queue Worker
# from redis import Redis
# from rq import Queue, Worker
# import json
# import requests
# import time
#
# redis_conn = Redis(host='redis', port=6379)
# queue = Queue('image-gen', connection=redis_conn)
#
# COMFYUI_URL = "http://comfyui:8188"
#
# def generate_image(job_data):
#     workflow = json.loads(job_data['workflow'])
#     workflow["6"]["inputs"]["text"] = job_data['prompt']
#     workflow["3"]["inputs"]["seed"] = job_data.get('seed', 42)
#
#     # Queue prompt
#     response = requests.post(f"{COMFYUI_URL}/prompt",
#                              json={"prompt": workflow})
#     prompt_id = response.json()["prompt_id"]
#
#     # Wait for completion
#     while True:
#         history = requests.get(f"{COMFYUI_URL}/history/{prompt_id}").json()
#         if prompt_id in history:
#             outputs = history[prompt_id]["outputs"]
#             return {"status": "completed", "outputs": outputs}
#         time.sleep(0.5)
#
# # API Endpoint
# from fastapi import FastAPI
# app = FastAPI()
#
# @app.post("/generate")
# async def submit_job(request: dict):
#     job = queue.enqueue(generate_image, request,
#                         job_timeout=300, result_ttl=3600)
#     return {"job_id": job.id, "status": "queued"}
#
# @app.get("/status/{job_id}")
# async def get_status(job_id: str):
#     job = queue.fetch_job(job_id)
#     return {"status": job.get_status(), "result": job.result}

from dataclasses import dataclass
from typing import List

@dataclass
class QueueJob:
    job_id: str
    prompt: str
    model: str
    status: str
    gpu_worker: str
    duration_sec: float

jobs = [
    QueueJob("j001", "sunset mountains", "SDXL", "completed", "gpu-0", 12.5),
    QueueJob("j002", "portrait photo", "SDXL", "processing", "gpu-1", 0.0),
    QueueJob("j003", "anime character", "SD1.5", "queued", "—", 0.0),
    QueueJob("j004", "product shot", "SDXL", "queued", "—", 0.0),
    QueueJob("j005", "landscape painting", "Flux", "completed", "gpu-0", 25.3),
]

print("=== Image Generation Queue ===")
for j in jobs:
    print(f"  [{j.status}] {j.job_id}: {j.prompt} ({j.model})")
    print(f"    Worker: {j.gpu_worker} | Duration: {j.duration_sec}s")

GPU Worker Management

# === GPU Worker Scaling ===

# docker-compose.yml
# services:
#   api:
#     build: ./api
#     ports: ["8000:8000"]
#     depends_on: [redis]
#   redis:
#     image: redis:7-alpine
#   comfyui-worker-0:
#     image: comfyui:latest
#     deploy:
#       resources:
#         reservations:
#           devices: [{ capabilities: [gpu], device_ids: ["0"] }]
#     environment:
#       - CUDA_VISIBLE_DEVICES=0
#   comfyui-worker-1:
#     image: comfyui:latest
#     deploy:
#       resources:
#         reservations:
#           devices: [{ capabilities: [gpu], device_ids: ["1"] }]

# Kubernetes KEDA ScaledObject
# apiVersion: keda.sh/v1alpha1
# kind: ScaledObject
# metadata:
#   name: comfyui-worker-scaler
# spec:
#   scaleTargetRef:
#     name: comfyui-worker
#   minReplicaCount: 0
#   maxReplicaCount: 10
#   triggers:
#     - type: redis
#       metadata:
#         address: redis:6379
#         listName: image-gen
#         listLength: "5"
#   advanced:
#     horizontalPodAutoscalerConfig:
#       behavior:
#         scaleDown:
#           stabilizationWindowSeconds: 300

@dataclass
class GPUWorker:
    name: str
    gpu_type: str
    vram_gb: int
    utilization: int
    jobs_completed: int
    status: str

workers = [
    GPUWorker("gpu-0", "A100", 80, 85, 150, "Active"),
    GPUWorker("gpu-1", "A100", 80, 72, 130, "Active"),
    GPUWorker("gpu-2", "A10G", 24, 90, 95, "Active"),
    GPUWorker("gpu-3", "A10G", 24, 0, 0, "Idle (scaling down)"),
]

print("\n=== GPU Workers ===")
for w in workers:
    print(f"  [{w.status}] {w.name} — {w.gpu_type} ({w.vram_gb}GB)")
    print(f"    Utilization: {w.utilization}% | Jobs: {w.jobs_completed}")

Monitoring และ Cost

# === Monitoring & Cost Optimization ===

# Prometheus Metrics
# from prometheus_client import Counter, Histogram, Gauge
#
# JOBS_TOTAL = Counter('image_gen_jobs_total', 'Total jobs',
#                      ['model', 'status'])
# JOB_DURATION = Histogram('image_gen_duration_seconds',
#                          'Job duration', ['model'])
# QUEUE_SIZE = Gauge('image_gen_queue_size', 'Queue size')
# GPU_UTILIZATION = Gauge('gpu_utilization_percent', 'GPU %',
#                         ['worker'])
# VRAM_USAGE = Gauge('gpu_vram_usage_bytes', 'VRAM bytes',
#                    ['worker'])

# Alert Rules
# - alert: QueueBacklog
#   expr: image_gen_queue_size > 50
#   for: 5m
#   labels: { severity: warning }
# - alert: GPUHighUtilization
#   expr: gpu_utilization_percent > 95
#   for: 10m
#   labels: { severity: warning }
# - alert: JobTimeout
#   expr: rate(image_gen_jobs_total{status="timeout"}[5m]) > 0.1
#   labels: { severity: critical }

monitoring = {
    "Queue Size": "5 jobs (healthy < 50)",
    "Avg Generation Time": "15.2s (SDXL) / 8.5s (SD1.5)",
    "GPU Utilization": "78% avg across workers",
    "VRAM Usage": "18.5GB / 24GB avg",
    "Jobs/Hour": "240 (peak: 500)",
    "Error Rate": "0.5% (timeout + OOM)",
    "P99 Latency": "35s (including queue wait)",
}

print("Monitoring Dashboard:")
for k, v in monitoring.items():
    print(f"  {k}: {v}")

# Cost Comparison
costs = {
    "A100 On-demand (2x)": "$6.80/hr = $4,896/mo",
    "A100 Spot (2x)": "$2.04/hr = $1,469/mo",
    "A10G On-demand (4x)": "$4.08/hr = $2,938/mo",
    "A10G Spot (4x)": "$1.22/hr = $878/mo",
    "KEDA 0-scale (A10G)": "~$500/mo (auto-scale)",
}

print(f"\n\nCost Comparison:")
for config, cost in costs.items():
    print(f"  {config}: {cost}")

เคล็ดลับ

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

เปรียบเทียบข้อดีและข้อเสีย

ข้อดีข้อเสีย
ประสิทธิภาพสูง ทำงานได้เร็วและแม่นยำ ลดเวลาทำงานซ้ำซ้อนต้องใช้เวลาเรียนรู้เบื้องต้นพอสมควร มี Learning Curve สูง
มี Community ขนาดใหญ่ มีคนช่วยเหลือและแหล่งเรียนรู้มากมายบางฟีเจอร์อาจยังไม่เสถียร หรือมีการเปลี่ยนแปลงบ่อยในเวอร์ชันใหม่
รองรับ Integration กับเครื่องมือและบริการอื่นได้หลากหลายต้นทุนอาจสูงสำหรับ Enterprise License หรือ Cloud Service
เป็น Open Source หรือมีเวอร์ชันฟรีให้เริ่มต้นใช้งานต้องการ Hardware หรือ Infrastructure ที่เพียงพอ

จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม

ComfyUI Stream Processing คืออะไร

สร้างภาพ AI ต่อเนื่อง Queue Redis Kafka GPU Worker Real-time WebSocket Progress API อัตโนมัติ Scale ตาม Load

ทำไมต้องใช้ Stream Processing กับ AI Image

GPU Overload Buffer Queue Priority กระจายงาน Worker Retry ไม่สูญหาย Monitor Scale Demand ไม่ต้อง Block User

Redis Queue กับ Kafka เลือกอะไรดี

Redis Queue ง่าย Low Latency < 1000 jobs/day Kafka High Throughput Durable Replay > 10000 jobs/day Event Pipeline

Scale GPU Worker อย่างไร

Kubernetes KEDA Auto-scale Queue Size 0-N ประหยัด GPU Spot Instance ลด 60-90% Health Check VRAM Preemption

สรุป

Stable Diffusion ComfyUI Stream Processing Redis Queue Kafka GPU Worker KEDA Kubernetes Auto-scale WebSocket Real-time Monitoring Prometheus Spot Instance Cost Optimization

📖 บทความที่เกี่ยวข้อง

Stable Diffusion ComfyUI Certification Pathอ่านบทความ → Stable Diffusion ComfyUI Troubleshooting แก้ปัญหาอ่านบทความ → Stable Diffusion ComfyUI Multi-cloud Strategyอ่านบทความ → Stable Diffusion ComfyUI Observability Stackอ่านบทความ → Stable Diffusion ComfyUI Message Queue Designอ่านบทความ →

📚 ดูบทความทั้งหมด →