LocalAI Self-hosted Message Queue Design คืออะไร
LocalAI เป็น open-source AI inference server ที่รัน LLMs, image generation และ audio models บนเครื่องตัวเอง โดยไม่ต้องส่งข้อมูลออกไปยัง cloud APIs รองรับ models เช่น Llama, Mistral, Phi และอื่นๆ ผ่าน OpenAI-compatible API Message Queue Design คือสถาปัตยกรรมที่ใช้ queue (เช่น Redis, RabbitMQ, Kafka) จัดการ requests แบบ asynchronous เพื่อรองรับ concurrent users และป้องกัน GPU overload การรวม LocalAI กับ message queue ช่วยให้ระบบ AI self-hosted scalable, reliable และจัดการ workload ได้อย่างมีประสิทธิภาพ
LocalAI Architecture
# localai_arch.py — LocalAI architecture overview
import json
class LocalAIArchitecture:
FEATURES = {
"openai_compatible": {
"name": "OpenAI-Compatible API",
"description": "Drop-in replacement สำหรับ OpenAI API — เปลี่ยน base URL = ใช้ได้ทันที",
"endpoints": "/v1/chat/completions, /v1/embeddings, /v1/images/generations",
},
"multi_model": {
"name": "Multi-Model Support",
"description": "รัน LLM, image gen, TTS, STT, embeddings ใน server เดียว",
"models": "Llama 3, Mistral, Phi-3, Stable Diffusion, Whisper, BERT",
},
"gpu_cpu": {
"name": "GPU + CPU Support",
"description": "รันบน GPU (CUDA, ROCm) หรือ CPU (llama.cpp, GGUF quantization)",
},
"privacy": {
"name": "Privacy & Security",
"description": "ข้อมูลไม่ออกจากเครื่อง — เหมาะสำหรับ sensitive data, compliance",
},
}
DOCKER_SETUP = """
# docker-compose.yml — LocalAI setup
version: '3.8'
services:
localai:
image: localai/localai:latest-aio-gpu-nvidia-cuda-12
ports:
- "8080:8080"
volumes:
- ./models:/build/models
environment:
- THREADS=4
- CONTEXT_SIZE=4096
- GALLERIES=[{"name":"model-gallery","url":"github:mudler/LocalAI/gallery/index.yaml@master"}]
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
"""
def show_features(self):
print("=== LocalAI Features ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}]")
print(f" {feat['description']}")
print()
def show_setup(self):
print("=== Docker Setup ===")
print(self.DOCKER_SETUP[:400])
arch = LocalAIArchitecture()
arch.show_features()
arch.show_setup()
Message Queue Architecture
# mq_design.py — Message queue design for LocalAI
import json
class MQDesign:
ARCHITECTURE = {
"producer": {
"name": "API Gateway / Producer",
"description": "รับ requests จาก clients → validate → ส่งเข้า queue",
"tech": "FastAPI, Flask, Express.js",
},
"queue": {
"name": "Message Queue / Broker",
"description": "เก็บ requests ใน queue → FIFO หรือ priority-based delivery",
"tech": "Redis Streams, RabbitMQ, BullMQ, Celery",
},
"worker": {
"name": "AI Worker (LocalAI)",
"description": "ดึง requests จาก queue → ส่งไป LocalAI → return results",
"tech": "Python worker + LocalAI HTTP API",
},
"result_store": {
"name": "Result Store",
"description": "เก็บ results สำหรับ client มา poll หรือ webhook callback",
"tech": "Redis, PostgreSQL, S3 (for images)",
},
}
QUEUE_PATTERNS = {
"fifo": "FIFO — First In First Out, ง่ายที่สุด",
"priority": "Priority Queue — urgent requests ก่อน (e.g., paid users)",
"rate_limit": "Rate Limiting — จำกัด requests per user per minute",
"batch": "Batch Processing — รวมหลาย requests ส่ง LocalAI ทีเดียว (embeddings)",
"dead_letter": "Dead Letter Queue — failed requests → retry หรือ investigate",
}
def show_architecture(self):
print("=== MQ Architecture ===\n")
for key, comp in self.ARCHITECTURE.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print(f" Tech: {comp['tech']}")
print()
def show_patterns(self):
print("=== Queue Patterns ===")
for key, desc in self.QUEUE_PATTERNS.items():
print(f" [{key}] {desc}")
mq = MQDesign()
mq.show_architecture()
mq.show_patterns()
Python Queue Worker
# worker.py — Python queue worker for LocalAI
import json
class QueueWorker:
CODE = """
# localai_worker.py — Queue worker for LocalAI inference
import redis
import requests
import json
import time
import logging
from dataclasses import dataclass
from typing import Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class InferenceRequest:
request_id: str
model: str
messages: list
max_tokens: int = 512
temperature: float = 0.7
priority: int = 0
user_id: str = ""
class LocalAIWorker:
def __init__(self, localai_url="http://localhost:8080",
redis_url="redis://localhost:6379"):
self.localai_url = localai_url
self.redis = redis.from_url(redis_url)
self.queue_name = "ai:requests"
self.result_prefix = "ai:result:"
self.running = True
def submit_request(self, request: InferenceRequest):
'''Submit request to queue'''
data = {
'request_id': request.request_id,
'model': request.model,
'messages': request.messages,
'max_tokens': request.max_tokens,
'temperature': request.temperature,
'user_id': request.user_id,
'submitted_at': time.time(),
}
# Priority queue using sorted set
self.redis.zadd(self.queue_name, {json.dumps(data): request.priority})
self.redis.set(f"{self.result_prefix}{request.request_id}",
json.dumps({'status': 'queued'}), ex=3600)
return request.request_id
def process_next(self):
'''Process next request from queue'''
# Get highest priority request
items = self.redis.zpopmax(self.queue_name, count=1)
if not items:
return None
data = json.loads(items[0][0])
request_id = data['request_id']
# Update status
self.redis.set(f"{self.result_prefix}{request_id}",
json.dumps({'status': 'processing'}), ex=3600)
try:
# Call LocalAI
start = time.time()
response = requests.post(
f"{self.localai_url}/v1/chat/completions",
json={
'model': data['model'],
'messages': data['messages'],
'max_tokens': data['max_tokens'],
'temperature': data['temperature'],
},
timeout=120,
)
elapsed = time.time() - start
result = response.json()
# Store result
output = {
'status': 'completed',
'result': result,
'processing_time_ms': round(elapsed * 1000),
'completed_at': time.time(),
}
self.redis.set(f"{self.result_prefix}{request_id}",
json.dumps(output), ex=3600)
logger.info(f"Completed {request_id} in {elapsed:.1f}s")
return output
except Exception as e:
error_output = {
'status': 'error',
'error': str(e),
'completed_at': time.time(),
}
self.redis.set(f"{self.result_prefix}{request_id}",
json.dumps(error_output), ex=3600)
logger.error(f"Error {request_id}: {e}")
return error_output
def get_result(self, request_id):
'''Get result for a request'''
data = self.redis.get(f"{self.result_prefix}{request_id}")
return json.loads(data) if data else None
def run(self, poll_interval=0.5):
'''Main worker loop'''
logger.info("Worker started")
while self.running:
result = self.process_next()
if result is None:
time.sleep(poll_interval)
# worker = LocalAIWorker("http://localai:8080")
# worker.run()
"""
def show_code(self):
print("=== Queue Worker ===")
print(self.CODE[:600])
worker = QueueWorker()
worker.show_code()
FastAPI Gateway
# gateway.py — FastAPI gateway for LocalAI queue
import json
class APIGateway:
CODE = """
# api_gateway.py — FastAPI gateway with queue
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import uuid
import redis
import json
app = FastAPI(title="LocalAI Queue Gateway")
r = redis.from_url("redis://localhost:6379")
class ChatRequest(BaseModel):
model: str = "llama3"
messages: List[dict]
max_tokens: int = 512
temperature: float = 0.7
class QueueResponse(BaseModel):
request_id: str
status: str
queue_position: int
@app.post("/v1/chat/completions/async", response_model=QueueResponse)
async def submit_chat(request: ChatRequest):
'''Submit chat request to queue'''
request_id = str(uuid.uuid4())
data = {
'request_id': request_id,
'model': request.model,
'messages': request.messages,
'max_tokens': request.max_tokens,
'temperature': request.temperature,
}
r.zadd("ai:requests", {json.dumps(data): 0})
r.set(f"ai:result:{request_id}", json.dumps({'status': 'queued'}), ex=3600)
queue_size = r.zcard("ai:requests")
return QueueResponse(
request_id=request_id,
status="queued",
queue_position=queue_size,
)
@app.get("/v1/results/{request_id}")
async def get_result(request_id: str):
'''Poll for result'''
data = r.get(f"ai:result:{request_id}")
if not data:
raise HTTPException(404, "Request not found")
return json.loads(data)
@app.get("/v1/queue/status")
async def queue_status():
'''Get queue status'''
return {
"pending": r.zcard("ai:requests"),
"processing": r.scard("ai:processing"),
}
# uvicorn api_gateway:app --host 0.0.0.0 --port 9000
"""
def show_code(self):
print("=== API Gateway ===")
print(self.CODE[:600])
gateway = APIGateway()
gateway.show_code()
Docker Compose Full Stack
# fullstack.py — Full stack deployment
import json
class FullStack:
COMPOSE = """
# docker-compose.yml — LocalAI + Queue + Gateway
version: '3.8'
services:
localai:
image: localai/localai:latest-aio-gpu-nvidia-cuda-12
volumes:
- ./models:/build/models
environment:
- THREADS=4
- CONTEXT_SIZE=4096
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
gateway:
build: ./gateway
ports:
- "9000:9000"
environment:
- REDIS_URL=redis://redis:6379
- LOCALAI_URL=http://localai:8080
depends_on:
- redis
worker:
build: ./worker
environment:
- REDIS_URL=redis://redis:6379
- LOCALAI_URL=http://localai:8080
depends_on:
- redis
- localai
deploy:
replicas: 2 # Scale workers
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
volumes:
redis_data:
"""
SCALING = {
"vertical": "เพิ่ม GPU ที่แรงกว่า — RTX 3090 → RTX 4090 → A100",
"horizontal": "เพิ่ม worker replicas — 2, 4, 8 workers กระจาย load",
"model_sharding": "แยก model ไป GPU คนละตัว — LLM บน GPU 0, Image Gen บน GPU 1",
"queue_priority": "Priority queue — paid users ก่อน, batch jobs ต่ำสุด",
}
def show_compose(self):
print("=== Full Stack ===")
print(self.COMPOSE[:500])
def show_scaling(self):
print(f"\n=== Scaling Strategies ===")
for key, desc in self.SCALING.items():
print(f" [{key}] {desc}")
stack = FullStack()
stack.show_compose()
stack.show_scaling()
FAQ - คำถามที่พบบ่อย
Q: LocalAI กับ Ollama อันไหนดีกว่า?
A: LocalAI: OpenAI-compatible API, multi-model (LLM + image + audio), GPU + CPU, production-ready Ollama: ง่ายกว่า, model management ดี (ollama pull), เหมาะ development เลือก LocalAI: production deployment, ต้องการ OpenAI drop-in replacement เลือก Ollama: local development, ทดลอง models, ง่ายที่สุด รวมกัน: Ollama สำหรับ dev → LocalAI สำหรับ production
Q: ทำไมต้องใช้ Message Queue?
A: GPU inference ช้า (1-30 วินาทีต่อ request) — ถ้าหลาย users ส่ง request พร้อมกัน: ไม่มี queue: GPU overload, OOM, requests timeout มี queue: requests เข้าคิว → process ทีละ batch → ไม่ overload เพิ่มเติม: priority queue, rate limiting, retry, monitoring จำเป็นเมื่อ: concurrent users > 2-3 คน หรือ production deployment
Q: Redis กับ RabbitMQ อันไหนเหมาะกว่า?
A: Redis (Streams/Sorted Set): ง่าย, เร็ว, เหมาะ queue ขนาดเล็ก-กลาง, ทำ result store ได้ด้วย RabbitMQ: feature เยอะกว่า (routing, exchanges, DLQ built-in), เหมาะ complex workflows เลือก Redis: ถ้าใช้ Redis อยู่แล้ว + queue ไม่ซับซ้อน เลือก RabbitMQ: ถ้าต้องการ advanced routing + guaranteed delivery
Q: GPU ไหนเหมาะกับ LocalAI?
A: ขึ้นกับ model: 7B params (Llama 3 7B): RTX 3060 12GB พอ 13B params: RTX 3090/4090 24GB 70B params: A100 80GB หรือ 2x RTX 4090 CPU only: ใช้ GGUF quantized models (Q4_K_M) — ช้าแต่ไม่ต้อง GPU คุ้มที่สุด: RTX 3090 มือสอง — 24GB VRAM ราคาดี
