SiamCafe.net Blog
Technology

LocalAI Self-hosted Message Queue Design

localai self hosted message queue design
LocalAI Self-hosted Message Queue Design | SiamCafe Blog
2025-11-12· อ. บอม — SiamCafe.net· 1,626 คำ

LocalAI Self-hosted Message Queue Design คืออะไร

LocalAI เป็น open-source AI inference server ที่รัน LLMs, image generation และ audio models บนเครื่องตัวเอง โดยไม่ต้องส่งข้อมูลออกไปยัง cloud APIs รองรับ models เช่น Llama, Mistral, Phi และอื่นๆ ผ่าน OpenAI-compatible API Message Queue Design คือสถาปัตยกรรมที่ใช้ queue (เช่น Redis, RabbitMQ, Kafka) จัดการ requests แบบ asynchronous เพื่อรองรับ concurrent users และป้องกัน GPU overload การรวม LocalAI กับ message queue ช่วยให้ระบบ AI self-hosted scalable, reliable และจัดการ workload ได้อย่างมีประสิทธิภาพ

LocalAI Architecture

# localai_arch.py — LocalAI architecture overview
import json

class LocalAIArchitecture:
    FEATURES = {
        "openai_compatible": {
            "name": "OpenAI-Compatible API",
            "description": "Drop-in replacement สำหรับ OpenAI API — เปลี่ยน base URL = ใช้ได้ทันที",
            "endpoints": "/v1/chat/completions, /v1/embeddings, /v1/images/generations",
        },
        "multi_model": {
            "name": "Multi-Model Support",
            "description": "รัน LLM, image gen, TTS, STT, embeddings ใน server เดียว",
            "models": "Llama 3, Mistral, Phi-3, Stable Diffusion, Whisper, BERT",
        },
        "gpu_cpu": {
            "name": "GPU + CPU Support",
            "description": "รันบน GPU (CUDA, ROCm) หรือ CPU (llama.cpp, GGUF quantization)",
        },
        "privacy": {
            "name": "Privacy & Security",
            "description": "ข้อมูลไม่ออกจากเครื่อง — เหมาะสำหรับ sensitive data, compliance",
        },
    }

    DOCKER_SETUP = """
# docker-compose.yml — LocalAI setup
version: '3.8'
services:
  localai:
    image: localai/localai:latest-aio-gpu-nvidia-cuda-12
    ports:
      - "8080:8080"
    volumes:
      - ./models:/build/models
    environment:
      - THREADS=4
      - CONTEXT_SIZE=4096
      - GALLERIES=[{"name":"model-gallery","url":"github:mudler/LocalAI/gallery/index.yaml@master"}]
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
"""

    def show_features(self):
        print("=== LocalAI Features ===\n")
        for key, feat in self.FEATURES.items():
            print(f"[{feat['name']}]")
            print(f"  {feat['description']}")
            print()

    def show_setup(self):
        print("=== Docker Setup ===")
        print(self.DOCKER_SETUP[:400])

arch = LocalAIArchitecture()
arch.show_features()
arch.show_setup()

Message Queue Architecture

# mq_design.py — Message queue design for LocalAI
import json

class MQDesign:
    ARCHITECTURE = {
        "producer": {
            "name": "API Gateway / Producer",
            "description": "รับ requests จาก clients → validate → ส่งเข้า queue",
            "tech": "FastAPI, Flask, Express.js",
        },
        "queue": {
            "name": "Message Queue / Broker",
            "description": "เก็บ requests ใน queue → FIFO หรือ priority-based delivery",
            "tech": "Redis Streams, RabbitMQ, BullMQ, Celery",
        },
        "worker": {
            "name": "AI Worker (LocalAI)",
            "description": "ดึง requests จาก queue → ส่งไป LocalAI → return results",
            "tech": "Python worker + LocalAI HTTP API",
        },
        "result_store": {
            "name": "Result Store",
            "description": "เก็บ results สำหรับ client มา poll หรือ webhook callback",
            "tech": "Redis, PostgreSQL, S3 (for images)",
        },
    }

    QUEUE_PATTERNS = {
        "fifo": "FIFO — First In First Out, ง่ายที่สุด",
        "priority": "Priority Queue — urgent requests ก่อน (e.g., paid users)",
        "rate_limit": "Rate Limiting — จำกัด requests per user per minute",
        "batch": "Batch Processing — รวมหลาย requests ส่ง LocalAI ทีเดียว (embeddings)",
        "dead_letter": "Dead Letter Queue — failed requests → retry หรือ investigate",
    }

    def show_architecture(self):
        print("=== MQ Architecture ===\n")
        for key, comp in self.ARCHITECTURE.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print(f"  Tech: {comp['tech']}")
            print()

    def show_patterns(self):
        print("=== Queue Patterns ===")
        for key, desc in self.QUEUE_PATTERNS.items():
            print(f"  [{key}] {desc}")

mq = MQDesign()
mq.show_architecture()
mq.show_patterns()

Python Queue Worker

# worker.py — Python queue worker for LocalAI
import json

class QueueWorker:
    CODE = """
# localai_worker.py — Queue worker for LocalAI inference
import redis
import requests
import json
import time
import logging
from dataclasses import dataclass
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class InferenceRequest:
    request_id: str
    model: str
    messages: list
    max_tokens: int = 512
    temperature: float = 0.7
    priority: int = 0
    user_id: str = ""

class LocalAIWorker:
    def __init__(self, localai_url="http://localhost:8080",
                 redis_url="redis://localhost:6379"):
        self.localai_url = localai_url
        self.redis = redis.from_url(redis_url)
        self.queue_name = "ai:requests"
        self.result_prefix = "ai:result:"
        self.running = True
    
    def submit_request(self, request: InferenceRequest):
        '''Submit request to queue'''
        data = {
            'request_id': request.request_id,
            'model': request.model,
            'messages': request.messages,
            'max_tokens': request.max_tokens,
            'temperature': request.temperature,
            'user_id': request.user_id,
            'submitted_at': time.time(),
        }
        
        # Priority queue using sorted set
        self.redis.zadd(self.queue_name, {json.dumps(data): request.priority})
        self.redis.set(f"{self.result_prefix}{request.request_id}", 
                      json.dumps({'status': 'queued'}), ex=3600)
        
        return request.request_id
    
    def process_next(self):
        '''Process next request from queue'''
        # Get highest priority request
        items = self.redis.zpopmax(self.queue_name, count=1)
        if not items:
            return None
        
        data = json.loads(items[0][0])
        request_id = data['request_id']
        
        # Update status
        self.redis.set(f"{self.result_prefix}{request_id}",
                      json.dumps({'status': 'processing'}), ex=3600)
        
        try:
            # Call LocalAI
            start = time.time()
            response = requests.post(
                f"{self.localai_url}/v1/chat/completions",
                json={
                    'model': data['model'],
                    'messages': data['messages'],
                    'max_tokens': data['max_tokens'],
                    'temperature': data['temperature'],
                },
                timeout=120,
            )
            
            elapsed = time.time() - start
            result = response.json()
            
            # Store result
            output = {
                'status': 'completed',
                'result': result,
                'processing_time_ms': round(elapsed * 1000),
                'completed_at': time.time(),
            }
            self.redis.set(f"{self.result_prefix}{request_id}",
                          json.dumps(output), ex=3600)
            
            logger.info(f"Completed {request_id} in {elapsed:.1f}s")
            return output
            
        except Exception as e:
            error_output = {
                'status': 'error',
                'error': str(e),
                'completed_at': time.time(),
            }
            self.redis.set(f"{self.result_prefix}{request_id}",
                          json.dumps(error_output), ex=3600)
            logger.error(f"Error {request_id}: {e}")
            return error_output
    
    def get_result(self, request_id):
        '''Get result for a request'''
        data = self.redis.get(f"{self.result_prefix}{request_id}")
        return json.loads(data) if data else None
    
    def run(self, poll_interval=0.5):
        '''Main worker loop'''
        logger.info("Worker started")
        while self.running:
            result = self.process_next()
            if result is None:
                time.sleep(poll_interval)

# worker = LocalAIWorker("http://localai:8080")
# worker.run()
"""

    def show_code(self):
        print("=== Queue Worker ===")
        print(self.CODE[:600])

worker = QueueWorker()
worker.show_code()

FastAPI Gateway

# gateway.py — FastAPI gateway for LocalAI queue
import json

class APIGateway:
    CODE = """
# api_gateway.py — FastAPI gateway with queue
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import uuid
import redis
import json

app = FastAPI(title="LocalAI Queue Gateway")
r = redis.from_url("redis://localhost:6379")

class ChatRequest(BaseModel):
    model: str = "llama3"
    messages: List[dict]
    max_tokens: int = 512
    temperature: float = 0.7

class QueueResponse(BaseModel):
    request_id: str
    status: str
    queue_position: int

@app.post("/v1/chat/completions/async", response_model=QueueResponse)
async def submit_chat(request: ChatRequest):
    '''Submit chat request to queue'''
    request_id = str(uuid.uuid4())
    
    data = {
        'request_id': request_id,
        'model': request.model,
        'messages': request.messages,
        'max_tokens': request.max_tokens,
        'temperature': request.temperature,
    }
    
    r.zadd("ai:requests", {json.dumps(data): 0})
    r.set(f"ai:result:{request_id}", json.dumps({'status': 'queued'}), ex=3600)
    
    queue_size = r.zcard("ai:requests")
    
    return QueueResponse(
        request_id=request_id,
        status="queued",
        queue_position=queue_size,
    )

@app.get("/v1/results/{request_id}")
async def get_result(request_id: str):
    '''Poll for result'''
    data = r.get(f"ai:result:{request_id}")
    if not data:
        raise HTTPException(404, "Request not found")
    return json.loads(data)

@app.get("/v1/queue/status")
async def queue_status():
    '''Get queue status'''
    return {
        "pending": r.zcard("ai:requests"),
        "processing": r.scard("ai:processing"),
    }

# uvicorn api_gateway:app --host 0.0.0.0 --port 9000
"""

    def show_code(self):
        print("=== API Gateway ===")
        print(self.CODE[:600])

gateway = APIGateway()
gateway.show_code()

Docker Compose Full Stack

# fullstack.py — Full stack deployment
import json

class FullStack:
    COMPOSE = """
# docker-compose.yml — LocalAI + Queue + Gateway
version: '3.8'
services:
  localai:
    image: localai/localai:latest-aio-gpu-nvidia-cuda-12
    volumes:
      - ./models:/build/models
    environment:
      - THREADS=4
      - CONTEXT_SIZE=4096
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  gateway:
    build: ./gateway
    ports:
      - "9000:9000"
    environment:
      - REDIS_URL=redis://redis:6379
      - LOCALAI_URL=http://localai:8080
    depends_on:
      - redis

  worker:
    build: ./worker
    environment:
      - REDIS_URL=redis://redis:6379
      - LOCALAI_URL=http://localai:8080
    depends_on:
      - redis
      - localai
    deploy:
      replicas: 2  # Scale workers

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

volumes:
  redis_data:
"""

    SCALING = {
        "vertical": "เพิ่ม GPU ที่แรงกว่า — RTX 3090 → RTX 4090 → A100",
        "horizontal": "เพิ่ม worker replicas — 2, 4, 8 workers กระจาย load",
        "model_sharding": "แยก model ไป GPU คนละตัว — LLM บน GPU 0, Image Gen บน GPU 1",
        "queue_priority": "Priority queue — paid users ก่อน, batch jobs ต่ำสุด",
    }

    def show_compose(self):
        print("=== Full Stack ===")
        print(self.COMPOSE[:500])

    def show_scaling(self):
        print(f"\n=== Scaling Strategies ===")
        for key, desc in self.SCALING.items():
            print(f"  [{key}] {desc}")

stack = FullStack()
stack.show_compose()
stack.show_scaling()

FAQ - คำถามที่พบบ่อย

Q: LocalAI กับ Ollama อันไหนดีกว่า?

A: LocalAI: OpenAI-compatible API, multi-model (LLM + image + audio), GPU + CPU, production-ready Ollama: ง่ายกว่า, model management ดี (ollama pull), เหมาะ development เลือก LocalAI: production deployment, ต้องการ OpenAI drop-in replacement เลือก Ollama: local development, ทดลอง models, ง่ายที่สุด รวมกัน: Ollama สำหรับ dev → LocalAI สำหรับ production

Q: ทำไมต้องใช้ Message Queue?

A: GPU inference ช้า (1-30 วินาทีต่อ request) — ถ้าหลาย users ส่ง request พร้อมกัน: ไม่มี queue: GPU overload, OOM, requests timeout มี queue: requests เข้าคิว → process ทีละ batch → ไม่ overload เพิ่มเติม: priority queue, rate limiting, retry, monitoring จำเป็นเมื่อ: concurrent users > 2-3 คน หรือ production deployment

Q: Redis กับ RabbitMQ อันไหนเหมาะกว่า?

A: Redis (Streams/Sorted Set): ง่าย, เร็ว, เหมาะ queue ขนาดเล็ก-กลาง, ทำ result store ได้ด้วย RabbitMQ: feature เยอะกว่า (routing, exchanges, DLQ built-in), เหมาะ complex workflows เลือก Redis: ถ้าใช้ Redis อยู่แล้ว + queue ไม่ซับซ้อน เลือก RabbitMQ: ถ้าต้องการ advanced routing + guaranteed delivery

Q: GPU ไหนเหมาะกับ LocalAI?

A: ขึ้นกับ model: 7B params (Llama 3 7B): RTX 3060 12GB พอ 13B params: RTX 3090/4090 24GB 70B params: A100 80GB หรือ 2x RTX 4090 CPU only: ใช้ GGUF quantized models (Q4_K_M) — ช้าแต่ไม่ต้อง GPU คุ้มที่สุด: RTX 3090 มือสอง — 24GB VRAM ราคาดี

📖 บทความที่เกี่ยวข้อง

LocalAI Self-hosted Automation Scriptอ่านบทความ → LocalAI Self-hosted Blue Green Canary Deployอ่านบทความ → LocalAI Self-hosted Audit Trail Loggingอ่านบทความ → LocalAI Self-hosted DevOps Cultureอ่านบทความ → LocalAI Self-hosted Testing Strategy QAอ่านบทความ →

📚 ดูบทความทั้งหมด →