SiamCafe · Blog
Stable Diffusion ComfyUI Stream Processing — AI
บทความ

Stable Diffusion ComfyUI Stream Processing — AI

เผยแพร่ 28 พฤษภาคม 2569

ComfyUI Stream Processing

Stable Diffusion ComfyUI Stream Processing Queue Redis Kafka GPU Worker Real-time Generation Scaling KEDA Kubernetes WebSocket Automation Pipeline

ArchitectureQueueLatencyScaleเหมาะกับ
Direct APIไม่มีต่ำ1 GPUDevelopment
Redis QueueRedis RQต่ำ1-10 GPUSmall-Medium
CeleryRedis/RabbitMQปานกลาง1-50 GPUMedium
Kafka PipelineKafkaปานกลาง10-100+ GPULarge Scale

Queue-based Architecture

=== ComfyUI Queue Architecture ===

Redis Queue Worker

from redis import Redis

from rq import Queue, Worker

import json

import requests

import time

redis_conn = Redis(host='redis', port=6379)

queue = Queue('image-gen', connection=redis_conn)

COMFYUI_URL = "http://comfyui:8188"

def generate_image(job_data):

workflow = json.loads(job_data['workflow'])

workflow["6"]["inputs"]["text"] = job_data['prompt']

workflow["3"]["inputs"]["seed"] = job_data.get('seed', 42)

# Queue prompt

response = requests.post(f"{COMFYUI_URL}/prompt",

json={"prompt": workflow})

prompt_id = response.json()["prompt_id"]

# Wait for completion

while True:

history = requests.get(f"{COMFYUI_URL}/history/{prompt_id}").json()

if prompt_id in history:

outputs = history[prompt_id]["outputs"]

return {"status": "completed", "outputs": outputs}

time.sleep(0.5)

# API Endpoint

from fastapi import FastAPI

app = FastAPI()

@app.post("/generate")

async def submit_job(request: dict):

job = queue.enqueue(generate_image, request,

job_timeout=300, result_ttl=3600)

return {"job_id": job.id, "status": "queued"}

@app.get("/status/{job_id}")

async def get_status(job_id: str):

job = queue.fetch_job(job_id)

return {"status": job.get_status(), "result": job.result}

from dataclasses import dataclass

from typing import List

@dataclass

class QueueJob:

job_id: str

prompt: str

model: str

status: str

gpu_worker: str

duration_sec: float

jobs = [

QueueJob("j001", "sunset mountains", "SDXL", "completed", "gpu-0", 12.5),

QueueJob("j002", "portrait photo", "SDXL", "processing", "gpu-1", 0.0),

QueueJob("j003", "anime character", "SD1.5", "queued", "—", 0.0),

QueueJob("j004", "product shot", "SDXL", "queued", "—", 0.0),

QueueJob("j005", "landscape painting", "Flux", "completed", "gpu-0", 25.3),

]

print("=== Image Generation Queue ===")

for j in jobs:

print(f" [{j.status}] {j.job_id}: {j.prompt} ({j.model})")

print(f" Worker: {j.gpu_worker} | Duration: {j.duration_sec}s")

GPU Worker Management

=== GPU Worker Scaling ===

docker-compose.yml

services:

api:

build: ./api

ports: ["8000:8000"]

depends_on: [redis]

redis:

image: redis:7-alpine

comfyui-worker-0:

image: comfyui:latest

deploy:

resources:

reservations:

devices: [{ capabilities: [gpu], device_ids: ["0"] }]

environment:

  • CUDA_VISIBLE_DEVICES=0

comfyui-worker-1:

image: comfyui:latest

deploy:

resources:

reservations:

devices: [{ capabilities: [gpu], device_ids: ["1"] }]

Kubernetes KEDA ScaledObject

apiVersion: keda.sh/v1alpha1

kind: ScaledObject

metadata:

name: comfyui-worker-scaler

spec:

scaleTargetRef:

name: comfyui-worker

minReplicaCount: 0

maxReplicaCount: 10

triggers:

  • type: redis

metadata:

address: redis:6379

listName: image-gen

listLength: "5"

advanced:

horizontalPodAutoscalerConfig:

behavior:

scaleDown:

stabilizationWindowSeconds: 300

@dataclass

class GPUWorker:

name: str

gpu_type: str

vram_gb: int

utilization: int

jobs_completed: int

status: str

workers = [

GPUWorker("gpu-0", "A100", 80, 85, 150, "Active"),

GPUWorker("gpu-1", "A100", 80, 72, 130, "Active"),

GPUWorker("gpu-2", "A10G", 24, 90, 95, "Active"),

GPUWorker("gpu-3", "A10G", 24, 0, 0, "Idle (scaling down)"),

]

print("\n=== GPU Workers ===")

for w in workers:

print(f" [{w.status}] {w.name} — {w.gpu_type} ({w.vram_gb}GB)")

print(f" Utilization: {w.utilization}% | Jobs: {w.jobs_completed}")

Monitoring และ Cost

# === Monitoring & Cost Optimization ===

# Prometheus Metrics
# from prometheus_client import Counter, Histogram, Gauge
#
# JOBS_TOTAL = Counter('image_gen_jobs_total', 'Total jobs',
#                      ['model', 'status'])
# JOB_DURATION = Histogram('image_gen_duration_seconds',
#                          'Job duration', ['model'])
# QUEUE_SIZE = Gauge('image_gen_queue_size', 'Queue size')
# GPU_UTILIZATION = Gauge('gpu_utilization_percent', 'GPU %',
#                         ['worker'])
# VRAM_USAGE = Gauge('gpu_vram_usage_bytes', 'VRAM bytes',
#                    ['worker'])

# Alert Rules
# - alert: QueueBacklog
#   expr: image_gen_queue_size > 50
#   for: 5m
#   labels: { severity: warning }
# - alert: GPUHighUtilization
#   expr: gpu_utilization_percent > 95
#   for: 10m
#   labels: { severity: warning }
# - alert: JobTimeout
#   expr: rate(image_gen_jobs_total{status="timeout"}[5m]) > 0.1
#   labels: { severity: critical }

monitoring = {
    "Queue Size": "5 jobs (healthy < 50)",
    "Avg Generation Time": "15.2s (SDXL) / 8.5s (SD1.5)",
    "GPU Utilization": "78% avg across workers",
    "VRAM Usage": "18.5GB / 24GB avg",
    "Jobs/Hour": "240 (peak: 500)",
    "Error Rate": "0.5% (timeout + OOM)",
    "P99 Latency": "35s (including queue wait)",
}

print("Monitoring Dashboard:")
for k, v in monitoring.items():
    print(f"  {k}: {v}")

# Cost Comparison
costs = {
    "A100 On-demand (2x)": "$6.80/hr = $4,896/mo",
    "A100 Spot (2x)": "$2.04/hr = $1,469/mo",
    "A10G On-demand (4x)": "$4.08/hr = $2,938/mo",
    "A10G Spot (4x)": "$1.22/hr = $878/mo",
    "KEDA 0-scale (A10G)": "~$500/mo (auto-scale)",
}

print(f"\n\nCost Comparison:")
for config, cost in costs.items():
    print(f"  {config}: {cost}")

เคล็ดลับ

  • Queue: ใช้ Redis Queue สำหรับ < 1000 jobs/day ง่ายและเร็ว
  • KEDA: Scale to 0 เมื่อไม่มีงาน ประหยัด GPU Cost
  • Spot: ใช้ Spot Instance ลดค่า GPU 60-90%
  • Cache: Cache ผลลัพธ์ที่ Prompt + Seed เหมือนกัน
  • Timeout: ตั้ง Job Timeout 5 นาที ป้องกัน Stuck Job

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

เปรียบเทียบข้อดีและข้อเสีย

ข้อดีข้อเสีย
ประสิทธิภาพสูง ทำงานได้เร็วและแม่นยำ ลดเวลาทำงานซ้ำซ้อนต้องใช้เวลาเรียนรู้เบื้องต้นพอสมควร มี Learning Curve สูง
มี Community ขนาดใหญ่ มีคนช่วยเหลือและแหล่งเรียนรู้มากมายบางฟีเจอร์อาจยังไม่เสถียร หรือมีการเปลี่ยนแปลงบ่อยในเวอร์ชันใหม่
รองรับ Integration กับเครื่องมือและบริการอื่นได้หลากหลายต้นทุนอาจสูงสำหรับ Enterprise License หรือ Cloud Service
เป็น Open Source หรือมีเวอร์ชันฟรีให้เริ่มต้นใช้งานต้องการ Hardware หรือ Infrastructure ที่เพียงพอ

จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม

ComfyUI Stream Processing คืออะไร

สร้างภาพ AI ต่อเนื่อง Queue Redis Kafka GPU Worker Real-time WebSocket Progress API อัตโนมัติ Scale ตาม Load

ทำไมต้องใช้ Stream Processing กับ AI Image

GPU Overload Buffer Queue Priority กระจายงาน Worker Retry ไม่สูญหาย Monitor Scale Demand ไม่ต้อง Block User

Redis Queue กับ Kafka เลือกอะไรดี

Redis Queue ง่าย Low Latency < 1000 jobs/day Kafka High Throughput Durable Replay > 10000 jobs/day Event Pipeline

Scale GPU Worker อย่างไร

Kubernetes KEDA Auto-scale Queue Size 0-N ประหยัด GPU Spot Instance ลด 60-90% Health Check VRAM Preemption

สรุป

Stable Diffusion ComfyUI Stream Processing Redis Queue Kafka GPU Worker KEDA Kubernetes Auto-scale WebSocket Real-time Monitoring Prometheus Spot Instance Cost Optimization