RAG Architecture กับ Site Reliability
RAG Architecture คืออะไร

RAG (Retrieval-Augmented Generation) เป็นสถาปัตยกรรมที่รวม 2 ส่วนหลักเข้าด้วยกัน คือ Retrieval Component ที่ค้นหาข้อมูลที่เกี่ยวข้องจาก Knowledge Base (มักเป็น Vector Database) และ Generation Component ที่ใช้ LLM สร้างคำตอบจาก Context ที่ค้นพบ ทำให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลดปัญหา Hallucination ที่เป็นจุดอ่อนหลักของ LLM
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน LocalAI Self-hosted AR VR Development —
เมื่อ RAG System ถูกใช้ใน Production ที่ User จำนวนมากเข้าถึง ความเสถียร (Reliability) กลายเป็นสิ่งสำคัญมาก Site Reliability Engineering (SRE) Principles จะช่วยให้ออกแบบ RAG System ที่ทนทานต่อ Failure, Monitor ได้ครอบคลุม และ Scale ได้ตาม Traffic
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Shadcn UI RBAC ABAC Policy
RAG System Architecture สำหรับ Production
- API Gateway: Rate Limiting, Authentication, Load Balancing
- Query Preprocessor: Query Rewriting, Intent Classification, Cache Lookup
- Embedding Service: แปลง Query เป็น Vector (Sentence Transformers / OpenAI)
- Vector Database: Qdrant / Milvus / Pinecone สำหรับ Similarity Search
- Reranker: จัดอันดับผลลัพธ์ใหม่ด้วย Cross-encoder
- LLM Service: Generate คำตอบจาก Retrieved Context
- Response Validator: ตรวจสอบคุณภาพคำตอบ, Hallucination Detection
- Cache Layer: Redis สำหรับ Semantic Cache ลด Latency และ Cost
SLI/SLO สำหรับ RAG System
# Python — SLI/SLO Definition และ Monitoring สำหรับ RAG
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
from prometheus_client import Histogram, Counter, Gauge, start_http_server
# === SLI Metrics ===
rag_latency = Histogram(
"rag_request_duration_seconds",
"RAG end-to-end latency",
buckets=[0.5, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0],
labelnames=["component"], # embedding, retrieval, generation, total
)
rag_errors = Counter(
"rag_errors_total",
"RAG error count",
labelnames=["error_type"], # timeout, llm_error, retrieval_empty, validation_failed
)
rag_requests = Counter("rag_requests_total", "Total RAG requests")
rag_relevance = Histogram(
"rag_retrieval_relevance_score",
"Retrieval relevance score",
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
)
rag_cache_hits = Counter("rag_cache_hits_total", "Cache hit count")
rag_token_usage = Counter(
"rag_token_usage_total", "Token usage",
labelnames=["type"], # prompt, completion
)
# === SLO Definitions ===
@dataclass
class SLO:
name: str
target: float
window_days: int = 30
SLOS = [
SLO("availability", 0.999, 30), # 99.9% Availability
SLO("latency_p95", 3.0, 30), # P95 < 3 seconds
SLO("latency_p99", 5.0, 30), # P99 < 5 seconds
SLO("retrieval_relevance_avg", 0.7, 30), # Avg relevance > 0.7
SLO("error_rate", 0.001, 30), # Error rate < 0.1%
]
# === Error Budget Calculator ===
class ErrorBudget:
def __init__(self, slo_target, window_minutes):
self.slo_target = slo_target
self.window_minutes = window_minutes
self.total_requests = 0
self.failed_requests = 0
def record(self, success: bool):
self.total_requests += 1
if not success:
self.failed_requests += 1
@property
def error_budget_total(self):
return self.total_requests * (1 - self.slo_target)
@property
def error_budget_remaining(self):
return max(0, self.error_budget_total - self.failed_requests)
@property
def error_budget_pct(self):
if self.error_budget_total == 0:
return 100.0
return (self.error_budget_remaining / self.error_budget_total) * 100
@property
def current_reliability(self):
if self.total_requests == 0:
return 1.0
return 1 - (self.failed_requests / self.total_requests)
# ตัวอย่างการใช้งาน
budget = ErrorBudget(slo_target=0.999, window_minutes=43200) # 30 days
# จำลอง 100,000 requests
import random
for _ in range(100000):
success = random.random() > 0.0005 # 0.05% error rate
budget.record(success)
print(f"Total Requests: {budget.total_requests:,}")
print(f"Failed Requests: {budget.failed_requests}")
print(f"Current Reliability: {budget.current_reliability:.4%}")
print(f"Error Budget Total: {budget.error_budget_total:.0f}")
print(f"Error Budget Remaining: {budget.error_budget_remaining:.0f}")
print(f"Error Budget Used: {100 - budget.error_budget_pct:.1f}%")
RAG Pipeline กับ Reliability Patterns

# RAG Pipeline พร้อม Retry, Fallback, Circuit Breaker
import asyncio
import hashlib
import json
import time
from typing import Optional
import redis.asyncio as redis
import httpx
class CircuitBreaker:
"""Circuit Breaker Pattern สำหรับ External Service"""
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "closed" # closed, open, half-open
def can_execute(self):
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
class ReliableRAGPipeline:
def __init__(self):
self.redis = redis.from_url("redis://localhost:6379")
self.llm_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
self.embedding_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=15)
async def semantic_cache_lookup(self, query: str) -> Optional[str]:
"""Semantic Cache — ลด Latency และ Cost"""
cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
cached = await self.redis.get(cache_key)
if cached:
rag_cache_hits.inc()
return json.loads(cached)
return None
async def semantic_cache_store(self, query: str, response: dict, ttl=3600):
cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
await self.redis.setex(cache_key, ttl, json.dumps(response, ensure_ascii=False))
async def embed_query(self, query: str, retries=3) -> list[float]:
"""Embedding กับ Retry Logic"""
if not self.embedding_breaker.can_execute():
raise Exception("Embedding service circuit breaker is open")
for attempt in range(retries):
try:
start = time.time()
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(
"http://embedding-service:8080/embed",
json={"text": query},
)
resp.raise_for_status()
rag_latency.labels(component="embedding").observe(time.time() - start)
self.embedding_breaker.record_success()
return resp.json()["embedding"]
except Exception as e:
self.embedding_breaker.record_failure()
if attempt < retries - 1:
await asyncio.sleep(0.5 * (2 ** attempt)) # Exponential backoff
else:
rag_errors.labels(error_type="embedding_error").inc()
raise
async def retrieve(self, embedding: list[float], top_k=5) -> list[dict]:
"""Vector Search กับ Fallback"""
start = time.time()
try:
# Primary: Qdrant
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.post(
"http://qdrant:6333/collections/docs/points/search",
json={"vector": embedding, "limit": top_k, "with_payload": True},
)
results = resp.json()["result"]
except Exception:
# Fallback: Elasticsearch (Keyword Search)
rag_errors.labels(error_type="retrieval_fallback").inc()
results = await self._fallback_keyword_search(embedding)
rag_latency.labels(component="retrieval").observe(time.time() - start)
# Track Relevance
if results:
avg_score = sum(r.get("score", 0) for r in results) / len(results)
rag_relevance.observe(avg_score)
return results
async def generate(self, query: str, context: list[dict], retries=2) -> str:
"""LLM Generation กับ Circuit Breaker"""
if not self.llm_breaker.can_execute():
return self._fallback_response(query, context)
context_text = "\n".join([doc["payload"]["text"] for doc in context])
prompt = f"Context:\n{context_text}\n\nQuestion: {query}\nAnswer:"
for attempt in range(retries):
try:
start = time.time()
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
"http://vllm:8080/v1/chat/completions",
json={
"model": "llama-3.1-8b",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.3,
},
)
result = resp.json()
rag_latency.labels(component="generation").observe(time.time() - start)
usage = result.get("usage", {})
rag_token_usage.labels(type="prompt").inc(usage.get("prompt_tokens", 0))
rag_token_usage.labels(type="completion").inc(usage.get("completion_tokens", 0))
self.llm_breaker.record_success()
return result["choices"][0]["message"]["content"]
except Exception:
self.llm_breaker.record_failure()
if attempt < retries - 1:
await asyncio.sleep(1)
else:
rag_errors.labels(error_type="llm_error").inc()
return self._fallback_response(query, context)
def _fallback_response(self, query, context):
"""Graceful Degradation — ส่ง Context กลับเมื่อ LLM ไม่พร้อม"""
if context:
return f"ขณะนี้ระบบกำลังประมวลผล กรุณาดูข้อมูลที่เกี่ยวข้อง:\n" + \
"\n".join([doc["payload"]["text"][:200] for doc in context[:3]])
return "ขออภัย ระบบไม่สามารถประมวลผลได้ กรุณาลองใหม่อีกครั้ง"
async def query(self, user_query: str) -> dict:
"""Main RAG Pipeline"""
total_start = time.time()
rag_requests.inc()
# 1. Cache Lookup
cached = await self.semantic_cache_lookup(user_query)
if cached:
return cached
# 2. Embed → Retrieve → Generate
embedding = await self.embed_query(user_query)
context = await self.retrieve(embedding)
answer = await self.generate(user_query, context)
result = {
"answer": answer,
"sources": [doc["payload"].get("source", "") for doc in context],
"latency_ms": round((time.time() - total_start) * 1000),
}
rag_latency.labels(component="total").observe(time.time() - total_start)
# 3. Cache Store
await self.semantic_cache_store(user_query, result)
return result
Load Testing สำหรับ RAG System
# locustfile.py — Load Test สำหรับ RAG API
from locust import HttpUser, task, between, events
import random
import time
SAMPLE_QUERIES = [
"วิธีตั้งค่า Kubernetes Cluster",
"Docker Compose คืออะไร",
"การใช้ Terraform กับ AWS",
"CI/CD Pipeline Best Practices",
"วิธีแก้ปัญหา Memory Leak",
"Microservices Architecture Design",
"Database Indexing Strategy",
"API Rate Limiting วิธีทำ",
"OAuth2 Flow อธิบาย",
"Redis Caching Pattern",
]
class RAGUser(HttpUser):
wait_time = between(1, 3)
host = "http://rag-api:8000"
@task(10)
def query_rag(self):
query = random.choice(SAMPLE_QUERIES)
start = time.time()
with self.client.post(
"/v1/query",
json={"query": query, "top_k": 5},
headers={"Authorization": "Bearer test-key"},
catch_response=True,
) as response:
latency = time.time() - start
if response.status_code == 200:
data = response.json()
if latency > 5.0:
response.failure(f"Too slow: {latency:.1f}s")
elif not data.get("answer"):
response.failure("Empty answer")
else:
response.success()
elif response.status_code == 429:
response.failure("Rate limited")
else:
response.failure(f"Status {response.status_code}")
@task(2)
def health_check(self):
self.client.get("/health")
# รัน: locust -f locustfile.py --users 100 --spawn-rate 10 --run-time 10m
Incident Response สำหรับ RAG System
- LLM Service Down: Circuit Breaker เปิด → Fallback ส่ง Retrieved Context กลับแทน → Alert On-call → Scale/Restart LLM Service
- Vector DB Slow: Switch ไป Keyword Search Fallback → Alert → ตรวจสอบ Index Health และ Resource Usage
- High Latency: เพิ่ม Cache TTL → Scale Embedding/LLM Workers → ตรวจสอบ GPU Utilization
- Hallucination Spike: ตรวจสอบ Retrieval Relevance → อาจมี Data Quality Issue → Update Knowledge Base
- Error Budget Exhausted: Freeze Deployments → Focus on Reliability → ทำ Post-mortem
RAG Architecture คืออะไร
RAG (Retrieval-Augmented Generation) รวม Information Retrieval กับ LLM Generation ดึงข้อมูลจาก Vector Database มาเป็น Context ให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลด Hallucination ใช้ใน Chatbot, Knowledge Base, Customer Support
แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal
เนื้อหาเกี่ยวข้อง — อ่านต่อ: บัญชีโอนเงินไม่ได้ — ข้อมูลครบถ้วน 2026





