ai

RAG Architecture กับ Site Reliability

RAG Architecture กับ Site Reliability

RAG Architecture คืออะไร

RAG Architecture กับ Site Reliability

RAG (Retrieval-Augmented Generation) เป็นสถาปัตยกรรมที่รวม 2 ส่วนหลักเข้าด้วยกัน คือ Retrieval Component ที่ค้นหาข้อมูลที่เกี่ยวข้องจาก Knowledge Base (มักเป็น Vector Database) และ Generation Component ที่ใช้ LLM สร้างคำตอบจาก Context ที่ค้นพบ ทำให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลดปัญหา Hallucination ที่เป็นจุดอ่อนหลักของ LLM

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน LocalAI Self-hosted AR VR Development —

เมื่อ RAG System ถูกใช้ใน Production ที่ User จำนวนมากเข้าถึง ความเสถียร (Reliability) กลายเป็นสิ่งสำคัญมาก Site Reliability Engineering (SRE) Principles จะช่วยให้ออกแบบ RAG System ที่ทนทานต่อ Failure, Monitor ได้ครอบคลุม และ Scale ได้ตาม Traffic

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Shadcn UI RBAC ABAC Policy

RAG System Architecture สำหรับ Production

  • API Gateway: Rate Limiting, Authentication, Load Balancing
  • Query Preprocessor: Query Rewriting, Intent Classification, Cache Lookup
  • Embedding Service: แปลง Query เป็น Vector (Sentence Transformers / OpenAI)
  • Vector Database: Qdrant / Milvus / Pinecone สำหรับ Similarity Search
  • Reranker: จัดอันดับผลลัพธ์ใหม่ด้วย Cross-encoder
  • LLM Service: Generate คำตอบจาก Retrieved Context
  • Response Validator: ตรวจสอบคุณภาพคำตอบ, Hallucination Detection
  • Cache Layer: Redis สำหรับ Semantic Cache ลด Latency และ Cost

SLI/SLO สำหรับ RAG System

# Python — SLI/SLO Definition และ Monitoring สำหรับ RAG
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
from prometheus_client import Histogram, Counter, Gauge, start_http_server

# === SLI Metrics ===
rag_latency = Histogram(
    "rag_request_duration_seconds",
    "RAG end-to-end latency",
    buckets=[0.5, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0],
    labelnames=["component"],  # embedding, retrieval, generation, total
)

rag_errors = Counter(
    "rag_errors_total",
    "RAG error count",
    labelnames=["error_type"],  # timeout, llm_error, retrieval_empty, validation_failed
)

rag_requests = Counter("rag_requests_total", "Total RAG requests")
rag_relevance = Histogram(
    "rag_retrieval_relevance_score",
    "Retrieval relevance score",
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
)

rag_cache_hits = Counter("rag_cache_hits_total", "Cache hit count")
rag_token_usage = Counter(
    "rag_token_usage_total", "Token usage",
    labelnames=["type"],  # prompt, completion
)

# === SLO Definitions ===
@dataclass
class SLO:
    name: str
    target: float
    window_days: int = 30

SLOS = [
    SLO("availability", 0.999, 30),           # 99.9% Availability
    SLO("latency_p95", 3.0, 30),              # P95 < 3 seconds
    SLO("latency_p99", 5.0, 30),              # P99 < 5 seconds
    SLO("retrieval_relevance_avg", 0.7, 30),  # Avg relevance > 0.7
    SLO("error_rate", 0.001, 30),             # Error rate < 0.1%
]

# === Error Budget Calculator ===
class ErrorBudget:
    def __init__(self, slo_target, window_minutes):
        self.slo_target = slo_target
        self.window_minutes = window_minutes
        self.total_requests = 0
        self.failed_requests = 0

    def record(self, success: bool):
        self.total_requests += 1
        if not success:
            self.failed_requests += 1

    @property
    def error_budget_total(self):
        return self.total_requests * (1 - self.slo_target)

    @property
    def error_budget_remaining(self):
        return max(0, self.error_budget_total - self.failed_requests)

    @property
    def error_budget_pct(self):
        if self.error_budget_total == 0:
            return 100.0
        return (self.error_budget_remaining / self.error_budget_total) * 100

    @property
    def current_reliability(self):
        if self.total_requests == 0:
            return 1.0
        return 1 - (self.failed_requests / self.total_requests)

# ตัวอย่างการใช้งาน
budget = ErrorBudget(slo_target=0.999, window_minutes=43200)  # 30 days

# จำลอง 100,000 requests
import random
for _ in range(100000):
    success = random.random() > 0.0005  # 0.05% error rate
    budget.record(success)

print(f"Total Requests: {budget.total_requests:,}")
print(f"Failed Requests: {budget.failed_requests}")
print(f"Current Reliability: {budget.current_reliability:.4%}")
print(f"Error Budget Total: {budget.error_budget_total:.0f}")
print(f"Error Budget Remaining: {budget.error_budget_remaining:.0f}")
print(f"Error Budget Used: {100 - budget.error_budget_pct:.1f}%")

RAG Pipeline กับ Reliability Patterns

RAG Architecture กับ Site Reliability
# RAG Pipeline พร้อม Retry, Fallback, Circuit Breaker
import asyncio
import hashlib
import json
import time
from typing import Optional
import redis.asyncio as redis
import httpx

class CircuitBreaker:
    """Circuit Breaker Pattern สำหรับ External Service"""
    def __init__(self, failure_threshold=5, recovery_timeout=30):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half-open

    def can_execute(self):
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

class ReliableRAGPipeline:
    def __init__(self):
        self.redis = redis.from_url("redis://localhost:6379")
        self.llm_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
        self.embedding_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=15)

    async def semantic_cache_lookup(self, query: str) -> Optional[str]:
        """Semantic Cache — ลด Latency และ Cost"""
        cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
        cached = await self.redis.get(cache_key)
        if cached:
            rag_cache_hits.inc()
            return json.loads(cached)
        return None

    async def semantic_cache_store(self, query: str, response: dict, ttl=3600):
        cache_key = f"rag:cache:{hashlib.md5(query.encode()).hexdigest()}"
        await self.redis.setex(cache_key, ttl, json.dumps(response, ensure_ascii=False))

    async def embed_query(self, query: str, retries=3) -> list[float]:
        """Embedding กับ Retry Logic"""
        if not self.embedding_breaker.can_execute():
            raise Exception("Embedding service circuit breaker is open")

        for attempt in range(retries):
            try:
                start = time.time()
                async with httpx.AsyncClient(timeout=5.0) as client:
                    resp = await client.post(
                        "http://embedding-service:8080/embed",
                        json={"text": query},
                    )
                    resp.raise_for_status()
                rag_latency.labels(component="embedding").observe(time.time() - start)
                self.embedding_breaker.record_success()
                return resp.json()["embedding"]
            except Exception as e:
                self.embedding_breaker.record_failure()
                if attempt < retries - 1:
                    await asyncio.sleep(0.5 * (2 ** attempt))  # Exponential backoff
                else:
                    rag_errors.labels(error_type="embedding_error").inc()
                    raise

    async def retrieve(self, embedding: list[float], top_k=5) -> list[dict]:
        """Vector Search กับ Fallback"""
        start = time.time()
        try:
            # Primary: Qdrant
            async with httpx.AsyncClient(timeout=3.0) as client:
                resp = await client.post(
                    "http://qdrant:6333/collections/docs/points/search",
                    json={"vector": embedding, "limit": top_k, "with_payload": True},
                )
                results = resp.json()["result"]
        except Exception:
            # Fallback: Elasticsearch (Keyword Search)
            rag_errors.labels(error_type="retrieval_fallback").inc()
            results = await self._fallback_keyword_search(embedding)

        rag_latency.labels(component="retrieval").observe(time.time() - start)

        # Track Relevance
        if results:
            avg_score = sum(r.get("score", 0) for r in results) / len(results)
            rag_relevance.observe(avg_score)

        return results

    async def generate(self, query: str, context: list[dict], retries=2) -> str:
        """LLM Generation กับ Circuit Breaker"""
        if not self.llm_breaker.can_execute():
            return self._fallback_response(query, context)

        context_text = "\n".join([doc["payload"]["text"] for doc in context])
        prompt = f"Context:\n{context_text}\n\nQuestion: {query}\nAnswer:"

        for attempt in range(retries):
            try:
                start = time.time()
                async with httpx.AsyncClient(timeout=30.0) as client:
                    resp = await client.post(
                        "http://vllm:8080/v1/chat/completions",
                        json={
                            "model": "llama-3.1-8b",
                            "messages": [{"role": "user", "content": prompt}],
                            "max_tokens": 1024,
                            "temperature": 0.3,
                        },
                    )
                    result = resp.json()
                rag_latency.labels(component="generation").observe(time.time() - start)
                usage = result.get("usage", {})
                rag_token_usage.labels(type="prompt").inc(usage.get("prompt_tokens", 0))
                rag_token_usage.labels(type="completion").inc(usage.get("completion_tokens", 0))
                self.llm_breaker.record_success()
                return result["choices"][0]["message"]["content"]
            except Exception:
                self.llm_breaker.record_failure()
                if attempt < retries - 1:
                    await asyncio.sleep(1)
                else:
                    rag_errors.labels(error_type="llm_error").inc()
                    return self._fallback_response(query, context)

    def _fallback_response(self, query, context):
        """Graceful Degradation — ส่ง Context กลับเมื่อ LLM ไม่พร้อม"""
        if context:
            return f"ขณะนี้ระบบกำลังประมวลผล กรุณาดูข้อมูลที่เกี่ยวข้อง:\n" + \
                   "\n".join([doc["payload"]["text"][:200] for doc in context[:3]])
        return "ขออภัย ระบบไม่สามารถประมวลผลได้ กรุณาลองใหม่อีกครั้ง"

    async def query(self, user_query: str) -> dict:
        """Main RAG Pipeline"""
        total_start = time.time()
        rag_requests.inc()

        # 1. Cache Lookup
        cached = await self.semantic_cache_lookup(user_query)
        if cached:
            return cached

        # 2. Embed → Retrieve → Generate
        embedding = await self.embed_query(user_query)
        context = await self.retrieve(embedding)
        answer = await self.generate(user_query, context)

        result = {
            "answer": answer,
            "sources": [doc["payload"].get("source", "") for doc in context],
            "latency_ms": round((time.time() - total_start) * 1000),
        }

        rag_latency.labels(component="total").observe(time.time() - total_start)

        # 3. Cache Store
        await self.semantic_cache_store(user_query, result)
        return result

Load Testing สำหรับ RAG System

# locustfile.py — Load Test สำหรับ RAG API
from locust import HttpUser, task, between, events
import random
import time

SAMPLE_QUERIES = [
    "วิธีตั้งค่า Kubernetes Cluster",
    "Docker Compose คืออะไร",
    "การใช้ Terraform กับ AWS",
    "CI/CD Pipeline Best Practices",
    "วิธีแก้ปัญหา Memory Leak",
    "Microservices Architecture Design",
    "Database Indexing Strategy",
    "API Rate Limiting วิธีทำ",
    "OAuth2 Flow อธิบาย",
    "Redis Caching Pattern",
]

class RAGUser(HttpUser):
    wait_time = between(1, 3)
    host = "http://rag-api:8000"

    @task(10)
    def query_rag(self):
        query = random.choice(SAMPLE_QUERIES)
        start = time.time()
        with self.client.post(
            "/v1/query",
            json={"query": query, "top_k": 5},
            headers={"Authorization": "Bearer test-key"},
            catch_response=True,
        ) as response:
            latency = time.time() - start
            if response.status_code == 200:
                data = response.json()
                if latency > 5.0:
                    response.failure(f"Too slow: {latency:.1f}s")
                elif not data.get("answer"):
                    response.failure("Empty answer")
                else:
                    response.success()
            elif response.status_code == 429:
                response.failure("Rate limited")
            else:
                response.failure(f"Status {response.status_code}")

    @task(2)
    def health_check(self):
        self.client.get("/health")

# รัน: locust -f locustfile.py --users 100 --spawn-rate 10 --run-time 10m

Incident Response สำหรับ RAG System

  • LLM Service Down: Circuit Breaker เปิด → Fallback ส่ง Retrieved Context กลับแทน → Alert On-call → Scale/Restart LLM Service
  • Vector DB Slow: Switch ไป Keyword Search Fallback → Alert → ตรวจสอบ Index Health และ Resource Usage
  • High Latency: เพิ่ม Cache TTL → Scale Embedding/LLM Workers → ตรวจสอบ GPU Utilization
  • Hallucination Spike: ตรวจสอบ Retrieval Relevance → อาจมี Data Quality Issue → Update Knowledge Base
  • Error Budget Exhausted: Freeze Deployments → Focus on Reliability → ทำ Post-mortem

RAG Architecture คืออะไร

RAG (Retrieval-Augmented Generation) รวม Information Retrieval กับ LLM Generation ดึงข้อมูลจาก Vector Database มาเป็น Context ให้ LLM ตอบคำถามได้ถูกต้องและอ้างอิงข้อมูลจริง ลด Hallucination ใช้ใน Chatbot, Knowledge Base, Customer Support

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

เนื้อหาเกี่ยวข้อง — อ่านต่อ: บัญชีโอนเงินไม่ได้ — ข้อมูลครบถ้วน 2026

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง