RAG Architecture คืออะไร
RAG (Retrieval-Augmented Generation) เป็นสถาปัตยกรรมที่รวม 2 ส่วนหลักเข้าด้วยกัน คือ Retrieval Component ที่ค้นหาข้อมูลที่เกี่ยวข้องจาก Knowledge Base (มักเป็น Vector Database) และ Generation Component ที่ใช้ LLM สร้างคำตอบจาก Context ที่ค้นพบ ทำให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลดปัญหา Hallucination ที่เป็นจุดอ่อนหลักของ LLM
เมื่อ RAG System ถูกใช้ใน Production ที่ User จำนวนมากเข้าถึง ความเสถียร (Reliability) กลายเป็นสิ่งสำคัญมาก Site Reliability Engineering (SRE) Principles จะช่วยให้ออกแบบ RAG System ที่ทนทานต่อ Failure, Monitor ได้ครอบคลุม และ Scale ได้ตาม Traffic
RAG System Architecture สำหรับ Production
- API Gateway: Rate Limiting, Authentication, Load Balancing
- Query Preprocessor: Query Rewriting, Intent Classification, Cache Lookup
- Embedding Service: แปลง Query เป็น Vector (Sentence Transformers / OpenAI)
- Vector Database: Qdrant / Milvus / Pinecone สำหรับ Similarity Search
- Reranker: จัดอันดับผลลัพธ์ใหม่ด้วย Cross-encoder
- LLM Service: Generate คำตอบจาก Retrieved Context
- Response Validator: ตรวจสอบคุณภาพคำตอบ, Hallucination Detection
- Cache Layer: Redis สำหรับ Semantic Cache ลด Latency และ Cost
SLI/SLO สำหรับ RAG System
# Python — SLI/SLO Definition และ Monitoring สำหรับ RAG
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
from prometheus_client import Histogram, Counter, Gauge, start_http_server
# === SLI Metrics ===
rag_latency = Histogram(
"rag_request_duration_seconds",
"RAG end-to-end latency",
buckets=[0.5, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0],
labelnames=["component"], # embedding, retrieval, generation, total
)
rag_errors = Counter(
"rag_errors_total",
"RAG error count",
labelnames=["error_type"], # timeout, llm_error, retrieval_empty, validation_failed
)
rag_requests = Counter("rag_requests_total", "Total RAG requests")
rag_relevance = Histogram(
"rag_retrieval_relevance_score",
"Retrieval relevance score",
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
)
rag_cache_hits = Counter("rag_cache_hits_total", "Cache hit count")
rag_token_usage = Counter(
"rag_token_usage_total", "Token usage",
labelnames=["type"], # prompt, completion
)
# === SLO Definitions ===
@dataclass
class SLO:
name: str
target: float
window_days: int = 30
SLOS = [
SLO("availability", 0.999, 30), # 99.9% Availability
SLO("latency_p95", 3.0, 30), # P95 < 3 seconds
SLO("latency_p99", 5.0, 30), # P99 < 5 seconds
SLO("retrieval_relevance_avg", 0.7, 30), # Avg relevance > 0.7
SLO("error_rate", 0.001, 30), # Error rate < 0.1%
]
# === Error Budget Calculator ===
class ErrorBudget:
def __init__(self, slo_target, window_minutes):
self.slo_target = slo_target
self.window_minutes = window_minutes
self.total_requests = 0
self.failed_requests = 0
def record(self, success: bool):
self.total_requests += 1
if not success:
self.failed_requests += 1
@property
def error_budget_total(self):
return self.total_requests * (1 - self.slo_target)
@property
def error_budget_remaining(self):
return max(0, self.error_budget_total - self.failed_requests)
@property
def error_budget_pct(self):
if self.error_budget_total == 0:
return 100.0
return (self.error_budget_remaining / self.error_budget_total) * 100
@property
def current_reliability(self):
if self.total_requests == 0:
return 1.0
return 1 - (self.failed_requests / self.total_requests)
# ตัวอย่างการใช้งาน
budget = ErrorBudget(slo_target=0.999, window_minutes=43200) # 30 days
# จำลอง 100,000 requests
import random
for _ in range(100000):
success = random.random() > 0.0005 # 0.05% error rate
budget.record(success)
print(f"Total Requests: {budget.total_requests:,}")
print(f"Failed Requests: {budget.failed_requests}")
print(f"Current Reliability: {budget.current_reliability:.4%}")
print(f"Error Budget Total: {budget.error_budget_total:.0f}")
print(f"Error Budget Remaining: {budget.error_budget_remaining:.0f}")
print(f"Error Budget Used: {100 - budget.error_budget_pct:.1f}%")
RAG Pipeline กับ Reliability Patterns
# RAG Pipeline พร้อม Retry, Fallback, Circuit Breaker
import asyncio
import hashlib
import json
import time
from typing import Optional
import redis.asyncio as redis
import httpx
class CircuitBreaker:
"""Circuit Breaker Pattern สำหรับ External Service"""
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "closed" # closed, open, half-open
def can_execute(self):
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
class ReliableRAGPipeline:
def __init__(self):
self.redis = redis.from_url("redis://localhost:6379")
self.llm_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
self.embedding_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=15)
async def semantic_cache_lookup(self, query: str) -> Optional[str]:
"""Semantic Cache — ลด Latency และ Cost"""
cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
cached = await self.redis.get(cache_key)
if cached:
rag_cache_hits.inc()
return json.loads(cached)
return None
async def semantic_cache_store(self, query: str, response: dict, ttl=3600):
cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
await self.redis.setex(cache_key, ttl, json.dumps(response, ensure_ascii=False))
async def embed_query(self, query: str, retries=3) -> list[float]:
"""Embedding กับ Retry Logic"""
if not self.embedding_breaker.can_execute():
raise Exception("Embedding service circuit breaker is open")
for attempt in range(retries):
try:
start = time.time()
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(
"http://embedding-service:8080/embed",
json={"text": query},
)
resp.raise_for_status()
rag_latency.labels(component="embedding").observe(time.time() - start)
self.embedding_breaker.record_success()
return resp.json()["embedding"]
except Exception as e:
self.embedding_breaker.record_failure()
if attempt < retries - 1:
await asyncio.sleep(0.5 * (2 ** attempt)) # Exponential backoff
else:
rag_errors.labels(error_type="embedding_error").inc()
raise
async def retrieve(self, embedding: list[float], top_k=5) -> list[dict]:
"""Vector Search กับ Fallback"""
start = time.time()
try:
# Primary: Qdrant
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.post(
"http://qdrant:6333/collections/docs/points/search",
json={"vector": embedding, "limit": top_k, "with_payload": True},
)
results = resp.json()["result"]
except Exception:
# Fallback: Elasticsearch (Keyword Search)
rag_errors.labels(error_type="retrieval_fallback").inc()
results = await self._fallback_keyword_search(embedding)
rag_latency.labels(component="retrieval").observe(time.time() - start)
# Track Relevance
if results:
avg_score = sum(r.get("score", 0) for r in results) / len(results)
rag_relevance.observe(avg_score)
return results
async def generate(self, query: str, context: list[dict], retries=2) -> str:
"""LLM Generation กับ Circuit Breaker"""
if not self.llm_breaker.can_execute():
return self._fallback_response(query, context)
context_text = "\n".join([doc["payload"]["text"] for doc in context])
prompt = f"Context:\n{context_text}\n\nQuestion: {query}\nAnswer:"
for attempt in range(retries):
try:
start = time.time()
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
"http://vllm:8080/v1/chat/completions",
json={
"model": "llama-3.1-8b",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.3,
},
)
result = resp.json()
rag_latency.labels(component="generation").observe(time.time() - start)
usage = result.get("usage", {})
rag_token_usage.labels(type="prompt").inc(usage.get("prompt_tokens", 0))
rag_token_usage.labels(type="completion").inc(usage.get("completion_tokens", 0))
self.llm_breaker.record_success()
return result["choices"][0]["message"]["content"]
except Exception:
self.llm_breaker.record_failure()
if attempt < retries - 1:
await asyncio.sleep(1)
else:
rag_errors.labels(error_type="llm_error").inc()
return self._fallback_response(query, context)
def _fallback_response(self, query, context):
"""Graceful Degradation — ส่ง Context กลับเมื่อ LLM ไม่พร้อม"""
if context:
return f"ขณะนี้ระบบกำลังประมวลผล กรุณาดูข้อมูลที่เกี่ยวข้อง:\n" + \
"\n".join([doc["payload"]["text"][:200] for doc in context[:3]])
return "ขออภัย ระบบไม่สามารถประมวลผลได้ กรุณาลองใหม่อีกครั้ง"
async def query(self, user_query: str) -> dict:
"""Main RAG Pipeline"""
total_start = time.time()
rag_requests.inc()
# 1. Cache Lookup
cached = await self.semantic_cache_lookup(user_query)
if cached:
return cached
# 2. Embed → Retrieve → Generate
embedding = await self.embed_query(user_query)
context = await self.retrieve(embedding)
answer = await self.generate(user_query, context)
result = {
"answer": answer,
"sources": [doc["payload"].get("source", "") for doc in context],
"latency_ms": round((time.time() - total_start) * 1000),
}
rag_latency.labels(component="total").observe(time.time() - total_start)
# 3. Cache Store
await self.semantic_cache_store(user_query, result)
return result
Load Testing สำหรับ RAG System
# locustfile.py — Load Test สำหรับ RAG API
from locust import HttpUser, task, between, events
import random
import time
SAMPLE_QUERIES = [
"วิธีตั้งค่า Kubernetes Cluster",
"Docker Compose คืออะไร",
"การใช้ Terraform กับ AWS",
"CI/CD Pipeline Best Practices",
"วิธีแก้ปัญหา Memory Leak",
"Microservices Architecture Design",
"Database Indexing Strategy",
"API Rate Limiting วิธีทำ",
"OAuth2 Flow อธิบาย",
"Redis Caching Pattern",
]
class RAGUser(HttpUser):
wait_time = between(1, 3)
host = "http://rag-api:8000"
@task(10)
def query_rag(self):
query = random.choice(SAMPLE_QUERIES)
start = time.time()
with self.client.post(
"/v1/query",
json={"query": query, "top_k": 5},
headers={"Authorization": "Bearer test-key"},
catch_response=True,
) as response:
latency = time.time() - start
if response.status_code == 200:
data = response.json()
if latency > 5.0:
response.failure(f"Too slow: {latency:.1f}s")
elif not data.get("answer"):
response.failure("Empty answer")
else:
response.success()
elif response.status_code == 429:
response.failure("Rate limited")
else:
response.failure(f"Status {response.status_code}")
@task(2)
def health_check(self):
self.client.get("/health")
# รัน: locust -f locustfile.py --users 100 --spawn-rate 10 --run-time 10m
Incident Response สำหรับ RAG System
- LLM Service Down: Circuit Breaker เปิด → Fallback ส่ง Retrieved Context กลับแทน → Alert On-call → Scale/Restart LLM Service
- Vector DB Slow: Switch ไป Keyword Search Fallback → Alert → ตรวจสอบ Index Health และ Resource Usage
- High Latency: เพิ่ม Cache TTL → Scale Embedding/LLM Workers → ตรวจสอบ GPU Utilization
- Hallucination Spike: ตรวจสอบ Retrieval Relevance → อาจมี Data Quality Issue → Update Knowledge Base
- Error Budget Exhausted: Freeze Deployments → Focus on Reliability → ทำ Post-mortem
RAG Architecture คืออะไร
RAG (Retrieval-Augmented Generation) รวม Information Retrieval กับ LLM Generation ดึงข้อมูลจาก Vector Database มาเป็น Context ให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลด Hallucination ใช้ใน Chatbot, Knowledge Base, Customer Support
SRE Principles ที่สำคัญสำหรับ RAG System มีอะไรบ้าง
SLI/SLO สำหรับวัด Latency, Quality และ Availability, Error Budget จัดการ Risk ระหว่าง Feature กับ Reliability, Monitoring ครอบคลุมทุก Component, Circuit Breaker และ Fallback สำหรับ Graceful Degradation และ Capacity Planning สำหรับ GPU
วิธี Monitor RAG System ทำอย่างไร
ติดตาม Retrieval Latency, Generation Latency, End-to-end Latency (P50/P95/P99), Retrieval Relevance Score, Token Usage, Error Rate, Cache Hit Rate, GPU Utilization ใช้ Prometheus + Grafana สำหรับ Metrics และ OpenTelemetry สำหรับ Distributed Tracing
RAG System ควรมี SLO อะไรบ้าง
End-to-end Latency P95 ไม่เกิน 3 วินาที, Availability 99.9%, Retrieval Relevance เฉลี่ยมากกว่า 0.7, Error Rate ต่ำกว่า 0.1%, Hallucination Rate ต่ำกว่า 5% วัดด้วย Automated Evaluation Pipeline ปรับ SLO ตาม Business Requirement
สรุปและแนวทางปฏิบัติ
การสร้าง RAG System ที่มีความเสถียรระดับ Production ต้องใช้ SRE Principles ตั้งแต่กำหนด SLI/SLO ที่ชัดเจน, ใช้ Error Budget จัดการ Risk, ออกแบบ Pipeline ด้วย Circuit Breaker และ Fallback Pattern, Monitor ทุก Component ด้วย Prometheus/Grafana, ใช้ Semantic Cache ลด Latency และ Cost, ทำ Load Testing เป็นประจำ และมี Incident Response Plan ที่พร้อมใช้งาน สิ่งสำคัญคือ Reliability ไม่ได้มาจาก Component เดียว แต่มาจากการออกแบบทั้งระบบให้ทนทานต่อ Failure ของทุกส่วน
