SiamCafe.net Blog
Technology

RAG Architecture Site Reliability SRE

rag architecture site reliability sre
RAG Architecture Site Reliability SRE | SiamCafe Blog
2025-08-18· อ. บอม — SiamCafe.net· 9,962 คำ

RAG Architecture คืออะไร

RAG (Retrieval-Augmented Generation) เป็นสถาปัตยกรรมที่รวม 2 ส่วนหลักเข้าด้วยกัน คือ Retrieval Component ที่ค้นหาข้อมูลที่เกี่ยวข้องจาก Knowledge Base (มักเป็น Vector Database) และ Generation Component ที่ใช้ LLM สร้างคำตอบจาก Context ที่ค้นพบ ทำให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลดปัญหา Hallucination ที่เป็นจุดอ่อนหลักของ LLM

เมื่อ RAG System ถูกใช้ใน Production ที่ User จำนวนมากเข้าถึง ความเสถียร (Reliability) กลายเป็นสิ่งสำคัญมาก Site Reliability Engineering (SRE) Principles จะช่วยให้ออกแบบ RAG System ที่ทนทานต่อ Failure, Monitor ได้ครอบคลุม และ Scale ได้ตาม Traffic

RAG System Architecture สำหรับ Production

SLI/SLO สำหรับ RAG System

# Python — SLI/SLO Definition และ Monitoring สำหรับ RAG
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
from prometheus_client import Histogram, Counter, Gauge, start_http_server

# === SLI Metrics ===
rag_latency = Histogram(
    "rag_request_duration_seconds",
    "RAG end-to-end latency",
    buckets=[0.5, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0],
    labelnames=["component"],  # embedding, retrieval, generation, total
)

rag_errors = Counter(
    "rag_errors_total",
    "RAG error count",
    labelnames=["error_type"],  # timeout, llm_error, retrieval_empty, validation_failed
)

rag_requests = Counter("rag_requests_total", "Total RAG requests")
rag_relevance = Histogram(
    "rag_retrieval_relevance_score",
    "Retrieval relevance score",
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
)

rag_cache_hits = Counter("rag_cache_hits_total", "Cache hit count")
rag_token_usage = Counter(
    "rag_token_usage_total", "Token usage",
    labelnames=["type"],  # prompt, completion
)

# === SLO Definitions ===
@dataclass
class SLO:
    name: str
    target: float
    window_days: int = 30

SLOS = [
    SLO("availability", 0.999, 30),           # 99.9% Availability
    SLO("latency_p95", 3.0, 30),              # P95 < 3 seconds
    SLO("latency_p99", 5.0, 30),              # P99 < 5 seconds
    SLO("retrieval_relevance_avg", 0.7, 30),  # Avg relevance > 0.7
    SLO("error_rate", 0.001, 30),             # Error rate < 0.1%
]

# === Error Budget Calculator ===
class ErrorBudget:
    def __init__(self, slo_target, window_minutes):
        self.slo_target = slo_target
        self.window_minutes = window_minutes
        self.total_requests = 0
        self.failed_requests = 0

    def record(self, success: bool):
        self.total_requests += 1
        if not success:
            self.failed_requests += 1

    @property
    def error_budget_total(self):
        return self.total_requests * (1 - self.slo_target)

    @property
    def error_budget_remaining(self):
        return max(0, self.error_budget_total - self.failed_requests)

    @property
    def error_budget_pct(self):
        if self.error_budget_total == 0:
            return 100.0
        return (self.error_budget_remaining / self.error_budget_total) * 100

    @property
    def current_reliability(self):
        if self.total_requests == 0:
            return 1.0
        return 1 - (self.failed_requests / self.total_requests)

# ตัวอย่างการใช้งาน
budget = ErrorBudget(slo_target=0.999, window_minutes=43200)  # 30 days

# จำลอง 100,000 requests
import random
for _ in range(100000):
    success = random.random() > 0.0005  # 0.05% error rate
    budget.record(success)

print(f"Total Requests: {budget.total_requests:,}")
print(f"Failed Requests: {budget.failed_requests}")
print(f"Current Reliability: {budget.current_reliability:.4%}")
print(f"Error Budget Total: {budget.error_budget_total:.0f}")
print(f"Error Budget Remaining: {budget.error_budget_remaining:.0f}")
print(f"Error Budget Used: {100 - budget.error_budget_pct:.1f}%")

RAG Pipeline กับ Reliability Patterns

# RAG Pipeline พร้อม Retry, Fallback, Circuit Breaker
import asyncio
import hashlib
import json
import time
from typing import Optional
import redis.asyncio as redis
import httpx

class CircuitBreaker:
    """Circuit Breaker Pattern สำหรับ External Service"""
    def __init__(self, failure_threshold=5, recovery_timeout=30):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half-open

    def can_execute(self):
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

class ReliableRAGPipeline:
    def __init__(self):
        self.redis = redis.from_url("redis://localhost:6379")
        self.llm_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
        self.embedding_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=15)

    async def semantic_cache_lookup(self, query: str) -> Optional[str]:
        """Semantic Cache — ลด Latency และ Cost"""
        cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
        cached = await self.redis.get(cache_key)
        if cached:
            rag_cache_hits.inc()
            return json.loads(cached)
        return None

    async def semantic_cache_store(self, query: str, response: dict, ttl=3600):
        cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
        await self.redis.setex(cache_key, ttl, json.dumps(response, ensure_ascii=False))

    async def embed_query(self, query: str, retries=3) -> list[float]:
        """Embedding กับ Retry Logic"""
        if not self.embedding_breaker.can_execute():
            raise Exception("Embedding service circuit breaker is open")

        for attempt in range(retries):
            try:
                start = time.time()
                async with httpx.AsyncClient(timeout=5.0) as client:
                    resp = await client.post(
                        "http://embedding-service:8080/embed",
                        json={"text": query},
                    )
                    resp.raise_for_status()
                rag_latency.labels(component="embedding").observe(time.time() - start)
                self.embedding_breaker.record_success()
                return resp.json()["embedding"]
            except Exception as e:
                self.embedding_breaker.record_failure()
                if attempt < retries - 1:
                    await asyncio.sleep(0.5 * (2 ** attempt))  # Exponential backoff
                else:
                    rag_errors.labels(error_type="embedding_error").inc()
                    raise

    async def retrieve(self, embedding: list[float], top_k=5) -> list[dict]:
        """Vector Search กับ Fallback"""
        start = time.time()
        try:
            # Primary: Qdrant
            async with httpx.AsyncClient(timeout=3.0) as client:
                resp = await client.post(
                    "http://qdrant:6333/collections/docs/points/search",
                    json={"vector": embedding, "limit": top_k, "with_payload": True},
                )
                results = resp.json()["result"]
        except Exception:
            # Fallback: Elasticsearch (Keyword Search)
            rag_errors.labels(error_type="retrieval_fallback").inc()
            results = await self._fallback_keyword_search(embedding)

        rag_latency.labels(component="retrieval").observe(time.time() - start)

        # Track Relevance
        if results:
            avg_score = sum(r.get("score", 0) for r in results) / len(results)
            rag_relevance.observe(avg_score)

        return results

    async def generate(self, query: str, context: list[dict], retries=2) -> str:
        """LLM Generation กับ Circuit Breaker"""
        if not self.llm_breaker.can_execute():
            return self._fallback_response(query, context)

        context_text = "\n".join([doc["payload"]["text"] for doc in context])
        prompt = f"Context:\n{context_text}\n\nQuestion: {query}\nAnswer:"

        for attempt in range(retries):
            try:
                start = time.time()
                async with httpx.AsyncClient(timeout=30.0) as client:
                    resp = await client.post(
                        "http://vllm:8080/v1/chat/completions",
                        json={
                            "model": "llama-3.1-8b",
                            "messages": [{"role": "user", "content": prompt}],
                            "max_tokens": 1024,
                            "temperature": 0.3,
                        },
                    )
                    result = resp.json()
                rag_latency.labels(component="generation").observe(time.time() - start)
                usage = result.get("usage", {})
                rag_token_usage.labels(type="prompt").inc(usage.get("prompt_tokens", 0))
                rag_token_usage.labels(type="completion").inc(usage.get("completion_tokens", 0))
                self.llm_breaker.record_success()
                return result["choices"][0]["message"]["content"]
            except Exception:
                self.llm_breaker.record_failure()
                if attempt < retries - 1:
                    await asyncio.sleep(1)
                else:
                    rag_errors.labels(error_type="llm_error").inc()
                    return self._fallback_response(query, context)

    def _fallback_response(self, query, context):
        """Graceful Degradation — ส่ง Context กลับเมื่อ LLM ไม่พร้อม"""
        if context:
            return f"ขณะนี้ระบบกำลังประมวลผล กรุณาดูข้อมูลที่เกี่ยวข้อง:\n" + \
                   "\n".join([doc["payload"]["text"][:200] for doc in context[:3]])
        return "ขออภัย ระบบไม่สามารถประมวลผลได้ กรุณาลองใหม่อีกครั้ง"

    async def query(self, user_query: str) -> dict:
        """Main RAG Pipeline"""
        total_start = time.time()
        rag_requests.inc()

        # 1. Cache Lookup
        cached = await self.semantic_cache_lookup(user_query)
        if cached:
            return cached

        # 2. Embed → Retrieve → Generate
        embedding = await self.embed_query(user_query)
        context = await self.retrieve(embedding)
        answer = await self.generate(user_query, context)

        result = {
            "answer": answer,
            "sources": [doc["payload"].get("source", "") for doc in context],
            "latency_ms": round((time.time() - total_start) * 1000),
        }

        rag_latency.labels(component="total").observe(time.time() - total_start)

        # 3. Cache Store
        await self.semantic_cache_store(user_query, result)
        return result

Load Testing สำหรับ RAG System

# locustfile.py — Load Test สำหรับ RAG API
from locust import HttpUser, task, between, events
import random
import time

SAMPLE_QUERIES = [
    "วิธีตั้งค่า Kubernetes Cluster",
    "Docker Compose คืออะไร",
    "การใช้ Terraform กับ AWS",
    "CI/CD Pipeline Best Practices",
    "วิธีแก้ปัญหา Memory Leak",
    "Microservices Architecture Design",
    "Database Indexing Strategy",
    "API Rate Limiting วิธีทำ",
    "OAuth2 Flow อธิบาย",
    "Redis Caching Pattern",
]

class RAGUser(HttpUser):
    wait_time = between(1, 3)
    host = "http://rag-api:8000"

    @task(10)
    def query_rag(self):
        query = random.choice(SAMPLE_QUERIES)
        start = time.time()
        with self.client.post(
            "/v1/query",
            json={"query": query, "top_k": 5},
            headers={"Authorization": "Bearer test-key"},
            catch_response=True,
        ) as response:
            latency = time.time() - start
            if response.status_code == 200:
                data = response.json()
                if latency > 5.0:
                    response.failure(f"Too slow: {latency:.1f}s")
                elif not data.get("answer"):
                    response.failure("Empty answer")
                else:
                    response.success()
            elif response.status_code == 429:
                response.failure("Rate limited")
            else:
                response.failure(f"Status {response.status_code}")

    @task(2)
    def health_check(self):
        self.client.get("/health")

# รัน: locust -f locustfile.py --users 100 --spawn-rate 10 --run-time 10m

Incident Response สำหรับ RAG System

RAG Architecture คืออะไร

RAG (Retrieval-Augmented Generation) รวม Information Retrieval กับ LLM Generation ดึงข้อมูลจาก Vector Database มาเป็น Context ให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลด Hallucination ใช้ใน Chatbot, Knowledge Base, Customer Support

SRE Principles ที่สำคัญสำหรับ RAG System มีอะไรบ้าง

SLI/SLO สำหรับวัด Latency, Quality และ Availability, Error Budget จัดการ Risk ระหว่าง Feature กับ Reliability, Monitoring ครอบคลุมทุก Component, Circuit Breaker และ Fallback สำหรับ Graceful Degradation และ Capacity Planning สำหรับ GPU

วิธี Monitor RAG System ทำอย่างไร

ติดตาม Retrieval Latency, Generation Latency, End-to-end Latency (P50/P95/P99), Retrieval Relevance Score, Token Usage, Error Rate, Cache Hit Rate, GPU Utilization ใช้ Prometheus + Grafana สำหรับ Metrics และ OpenTelemetry สำหรับ Distributed Tracing

RAG System ควรมี SLO อะไรบ้าง

End-to-end Latency P95 ไม่เกิน 3 วินาที, Availability 99.9%, Retrieval Relevance เฉลี่ยมากกว่า 0.7, Error Rate ต่ำกว่า 0.1%, Hallucination Rate ต่ำกว่า 5% วัดด้วย Automated Evaluation Pipeline ปรับ SLO ตาม Business Requirement

สรุปและแนวทางปฏิบัติ

การสร้าง RAG System ที่มีความเสถียรระดับ Production ต้องใช้ SRE Principles ตั้งแต่กำหนด SLI/SLO ที่ชัดเจน, ใช้ Error Budget จัดการ Risk, ออกแบบ Pipeline ด้วย Circuit Breaker และ Fallback Pattern, Monitor ทุก Component ด้วย Prometheus/Grafana, ใช้ Semantic Cache ลด Latency และ Cost, ทำ Load Testing เป็นประจำ และมี Incident Response Plan ที่พร้อมใช้งาน สิ่งสำคัญคือ Reliability ไม่ได้มาจาก Component เดียว แต่มาจากการออกแบบทั้งระบบให้ทนทานต่อ Failure ของทุกส่วน

📖 บทความที่เกี่ยวข้อง

Kubernetes Network Policy Site Reliability SREอ่านบทความ → Kubernetes Admission Webhook Site Reliability SREอ่านบทความ → Shopify Hydrogen Site Reliability SREอ่านบทความ → Directus CMS Site Reliability SREอ่านบทความ → TensorRT Optimization Site Reliability SREอ่านบทความ →

📚 ดูบทความทั้งหมด →