ComfyUI Stream Processing
Stable Diffusion ComfyUI Stream Processing Queue Redis Kafka GPU Worker Real-time Generation Scaling KEDA Kubernetes WebSocket Automation Pipeline
| Architecture | Queue | Latency | Scale | เหมาะกับ |
|---|---|---|---|---|
| Direct API | ไม่มี | ต่ำ | 1 GPU | Development |
| Redis Queue | Redis RQ | ต่ำ | 1-10 GPU | Small-Medium |
| Celery | Redis/RabbitMQ | ปานกลาง | 1-50 GPU | Medium |
| Kafka Pipeline | Kafka | ปานกลาง | 10-100+ GPU | Large Scale |
Queue-based Architecture
# === ComfyUI Queue Architecture ===
# Redis Queue Worker
# from redis import Redis
# from rq import Queue, Worker
# import json
# import requests
# import time
#
# redis_conn = Redis(host='redis', port=6379)
# queue = Queue('image-gen', connection=redis_conn)
#
# COMFYUI_URL = "http://comfyui:8188"
#
# def generate_image(job_data):
# workflow = json.loads(job_data['workflow'])
# workflow["6"]["inputs"]["text"] = job_data['prompt']
# workflow["3"]["inputs"]["seed"] = job_data.get('seed', 42)
#
# # Queue prompt
# response = requests.post(f"{COMFYUI_URL}/prompt",
# json={"prompt": workflow})
# prompt_id = response.json()["prompt_id"]
#
# # Wait for completion
# while True:
# history = requests.get(f"{COMFYUI_URL}/history/{prompt_id}").json()
# if prompt_id in history:
# outputs = history[prompt_id]["outputs"]
# return {"status": "completed", "outputs": outputs}
# time.sleep(0.5)
#
# # API Endpoint
# from fastapi import FastAPI
# app = FastAPI()
#
# @app.post("/generate")
# async def submit_job(request: dict):
# job = queue.enqueue(generate_image, request,
# job_timeout=300, result_ttl=3600)
# return {"job_id": job.id, "status": "queued"}
#
# @app.get("/status/{job_id}")
# async def get_status(job_id: str):
# job = queue.fetch_job(job_id)
# return {"status": job.get_status(), "result": job.result}
from dataclasses import dataclass
from typing import List
@dataclass
class QueueJob:
job_id: str
prompt: str
model: str
status: str
gpu_worker: str
duration_sec: float
jobs = [
QueueJob("j001", "sunset mountains", "SDXL", "completed", "gpu-0", 12.5),
QueueJob("j002", "portrait photo", "SDXL", "processing", "gpu-1", 0.0),
QueueJob("j003", "anime character", "SD1.5", "queued", "—", 0.0),
QueueJob("j004", "product shot", "SDXL", "queued", "—", 0.0),
QueueJob("j005", "landscape painting", "Flux", "completed", "gpu-0", 25.3),
]
print("=== Image Generation Queue ===")
for j in jobs:
print(f" [{j.status}] {j.job_id}: {j.prompt} ({j.model})")
print(f" Worker: {j.gpu_worker} | Duration: {j.duration_sec}s")
GPU Worker Management
# === GPU Worker Scaling ===
# docker-compose.yml
# services:
# api:
# build: ./api
# ports: ["8000:8000"]
# depends_on: [redis]
# redis:
# image: redis:7-alpine
# comfyui-worker-0:
# image: comfyui:latest
# deploy:
# resources:
# reservations:
# devices: [{ capabilities: [gpu], device_ids: ["0"] }]
# environment:
# - CUDA_VISIBLE_DEVICES=0
# comfyui-worker-1:
# image: comfyui:latest
# deploy:
# resources:
# reservations:
# devices: [{ capabilities: [gpu], device_ids: ["1"] }]
# Kubernetes KEDA ScaledObject
# apiVersion: keda.sh/v1alpha1
# kind: ScaledObject
# metadata:
# name: comfyui-worker-scaler
# spec:
# scaleTargetRef:
# name: comfyui-worker
# minReplicaCount: 0
# maxReplicaCount: 10
# triggers:
# - type: redis
# metadata:
# address: redis:6379
# listName: image-gen
# listLength: "5"
# advanced:
# horizontalPodAutoscalerConfig:
# behavior:
# scaleDown:
# stabilizationWindowSeconds: 300
@dataclass
class GPUWorker:
name: str
gpu_type: str
vram_gb: int
utilization: int
jobs_completed: int
status: str
workers = [
GPUWorker("gpu-0", "A100", 80, 85, 150, "Active"),
GPUWorker("gpu-1", "A100", 80, 72, 130, "Active"),
GPUWorker("gpu-2", "A10G", 24, 90, 95, "Active"),
GPUWorker("gpu-3", "A10G", 24, 0, 0, "Idle (scaling down)"),
]
print("\n=== GPU Workers ===")
for w in workers:
print(f" [{w.status}] {w.name} — {w.gpu_type} ({w.vram_gb}GB)")
print(f" Utilization: {w.utilization}% | Jobs: {w.jobs_completed}")
Monitoring และ Cost
# === Monitoring & Cost Optimization ===
# Prometheus Metrics
# from prometheus_client import Counter, Histogram, Gauge
#
# JOBS_TOTAL = Counter('image_gen_jobs_total', 'Total jobs',
# ['model', 'status'])
# JOB_DURATION = Histogram('image_gen_duration_seconds',
# 'Job duration', ['model'])
# QUEUE_SIZE = Gauge('image_gen_queue_size', 'Queue size')
# GPU_UTILIZATION = Gauge('gpu_utilization_percent', 'GPU %',
# ['worker'])
# VRAM_USAGE = Gauge('gpu_vram_usage_bytes', 'VRAM bytes',
# ['worker'])
# Alert Rules
# - alert: QueueBacklog
# expr: image_gen_queue_size > 50
# for: 5m
# labels: { severity: warning }
# - alert: GPUHighUtilization
# expr: gpu_utilization_percent > 95
# for: 10m
# labels: { severity: warning }
# - alert: JobTimeout
# expr: rate(image_gen_jobs_total{status="timeout"}[5m]) > 0.1
# labels: { severity: critical }
monitoring = {
"Queue Size": "5 jobs (healthy < 50)",
"Avg Generation Time": "15.2s (SDXL) / 8.5s (SD1.5)",
"GPU Utilization": "78% avg across workers",
"VRAM Usage": "18.5GB / 24GB avg",
"Jobs/Hour": "240 (peak: 500)",
"Error Rate": "0.5% (timeout + OOM)",
"P99 Latency": "35s (including queue wait)",
}
print("Monitoring Dashboard:")
for k, v in monitoring.items():
print(f" {k}: {v}")
# Cost Comparison
costs = {
"A100 On-demand (2x)": "$6.80/hr = $4,896/mo",
"A100 Spot (2x)": "$2.04/hr = $1,469/mo",
"A10G On-demand (4x)": "$4.08/hr = $2,938/mo",
"A10G Spot (4x)": "$1.22/hr = $878/mo",
"KEDA 0-scale (A10G)": "~$500/mo (auto-scale)",
}
print(f"\n\nCost Comparison:")
for config, cost in costs.items():
print(f" {config}: {cost}")
เคล็ดลับ
- Queue: ใช้ Redis Queue สำหรับ < 1000 jobs/day ง่ายและเร็ว
- KEDA: Scale to 0 เมื่อไม่มีงาน ประหยัด GPU Cost
- Spot: ใช้ Spot Instance ลดค่า GPU 60-90%
- Cache: Cache ผลลัพธ์ที่ Prompt + Seed เหมือนกัน
- Timeout: ตั้ง Job Timeout 5 นาที ป้องกัน Stuck Job
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
เปรียบเทียบข้อดีและข้อเสีย
จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม
ComfyUI Stream Processing คืออะไร
สร้างภาพ AI ต่อเนื่อง Queue Redis Kafka GPU Worker Real-time WebSocket Progress API อัตโนมัติ Scale ตาม Load
ทำไมต้องใช้ Stream Processing กับ AI Image
GPU Overload Buffer Queue Priority กระจายงาน Worker Retry ไม่สูญหาย Monitor Scale Demand ไม่ต้อง Block User
Redis Queue กับ Kafka เลือกอะไรดี
Redis Queue ง่าย Low Latency < 1000 jobs/day Kafka High Throughput Durable Replay > 10000 jobs/day Event Pipeline
Scale GPU Worker อย่างไร
Kubernetes KEDA Auto-scale Queue Size 0-N ประหยัด GPU Spot Instance ลด 60-90% Health Check VRAM Preemption
สรุป
Stable Diffusion ComfyUI Stream Processing Redis Queue Kafka GPU Worker KEDA Kubernetes Auto-scale WebSocket Real-time Monitoring Prometheus Spot Instance Cost Optimization
