Vector Database และ Pinecone

Vector Database เป็นฐานข้อมูลเฉพาะทางที่ออกแบบมาสำหรับเก็บและค้นหา Vector Embeddings ซึ่งเป็นตัวแทนข้อมูล (Text, Image, Audio) ในรูปแบบตัวเลขหลายมิติ การค้นหาใช้ Approximate Nearest Neighbor (ANN) Algorithm เช่น HNSW ทำให้ค้นหาใน Milliseconds แม้มีข้อมูลหลายล้าน Vectors

Pinecone เป็น Managed Vector Database ที่นิยมที่สุด ให้บริการแบบ Serverless ไม่ต้องจัดการ Infrastructure รองรับ Real-time Indexing, Metadata Filtering, Namespace Isolation และ Scale อัตโนมัติ เหมาะสำหรับ Production AI Applications

ใช้งาน Pinecone

# === Pinecone Setup และ CRUD Operations ===
# pip install pinecone-client openai

from pinecone import Pinecone, ServerlessSpec
import openai
import json
import time

# Initialize Pinecone
pc = Pinecone(api_key="YOUR_PINECONE_API_KEY")

# สร้าง Index
index_name = "knowledge-base"

if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=1536,          # OpenAI text-embedding-3-small
        metric="cosine",         # cosine, euclidean, dotproduct
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1",
        ),
    )
    print(f"Index '{index_name}' created")

# เชื่อมต่อ Index
index = pc.Index(index_name)

# === Embedding Function ===
client = openai.OpenAI(api_key="YOUR_OPENAI_KEY")

def get_embedding(text, model="text-embedding-3-small"):
    """สร้าง Embedding จากข้อความ"""
    response = client.embeddings.create(input=text, model=model)
    return response.data[0].embedding

# === Upsert Vectors ===
documents = [
    {"id": "doc-001", "text": "Kubernetes is a container orchestration platform",
     "metadata": {"category": "devops", "source": "docs"}},
    {"id": "doc-002", "text": "Docker containers package applications with dependencies",
     "metadata": {"category": "devops", "source": "blog"}},
    {"id": "doc-003", "text": "PostgreSQL is a relational database management system",
     "metadata": {"category": "database", "source": "docs"}},
    {"id": "doc-004", "text": "Redis is an in-memory data structure store",
     "metadata": {"category": "database", "source": "docs"}},
    {"id": "doc-005", "text": "Prometheus monitors metrics from containerized applications",
     "metadata": {"category": "monitoring", "source": "blog"}},
]

# Batch Upsert
vectors = []
for doc in documents:
    embedding = get_embedding(doc["text"])
    vectors.append({
        "id": doc["id"],
        "values": embedding,
        "metadata": {**doc["metadata"], "text": doc["text"]},
    })

index.upsert(vectors=vectors, namespace="production")
print(f"Upserted {len(vectors)} vectors")

# === Query (Similarity Search) ===
query_text = "How to monitor containers?"
query_embedding = get_embedding(query_text)

results = index.query(
    vector=query_embedding,
    top_k=3,
    include_metadata=True,
    namespace="production",
    filter={"category": {"$in": ["monitoring", "devops"]}},
)

print(f"\nQuery: '{query_text}'")
for match in results["matches"]:
    print(f"  Score: {match['score']:.4f} | {match['metadata']['text']}")

# === Index Stats ===
stats = index.describe_index_stats()
print(f"\nIndex Stats:")
print(f"  Total Vectors: {stats['total_vector_count']}")
print(f"  Namespaces: {stats['namespaces']}")

# === Delete ===
# index.delete(ids=["doc-001"], namespace="production")
# index.delete(delete_all=True, namespace="staging")

RAG Pipeline ด้วย Pinecone

# rag_pipeline.py — Retrieval-Augmented Generation
from pinecone import Pinecone
import openai
from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class RAGResponse:
    answer: str
    sources: list
    confidence: float

class RAGPipeline:
    """RAG Pipeline ด้วย Pinecone + OpenAI"""

    def __init__(self, pinecone_key, openai_key, index_name):
        self.pc = Pinecone(api_key=pinecone_key)
        self.index = self.pc.Index(index_name)
        self.openai = openai.OpenAI(api_key=openai_key)

    def embed(self, text):
        res = self.openai.embeddings.create(
            input=text, model="text-embedding-3-small"
        )
        return res.data[0].embedding

    def retrieve(self, query, top_k=5, namespace="production",
                 filter_dict=None):
        """ค้นหาเอกสารที่เกี่ยวข้อง"""
        embedding = self.embed(query)
        results = self.index.query(
            vector=embedding,
            top_k=top_k,
            include_metadata=True,
            namespace=namespace,
            filter=filter_dict,
        )
        return results["matches"]

    def generate(self, query, context_docs, model="gpt-4o-mini"):
        """สร้างคำตอบจาก Context"""
        context = "\n\n".join([
            f"[Source: {d['metadata'].get('source', 'unknown')}]\n"
            f"{d['metadata'].get('text', '')}"
            for d in context_docs
        ])

        response = self.openai.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content":
                 "Answer based on the provided context. "
                 "If the context doesn't contain the answer, say so."},
                {"role": "user", "content":
                 f"Context:\n{context}\n\nQuestion: {query}"},
            ],
            temperature=0.3,
        )

        return response.choices[0].message.content

    def ask(self, query, top_k=5, namespace="production") -> RAGResponse:
        """End-to-end RAG Query"""
        # Retrieve
        docs = self.retrieve(query, top_k=top_k, namespace=namespace)

        if not docs:
            return RAGResponse(
                answer="No relevant documents found.",
                sources=[], confidence=0.0,
            )

        # Generate
        answer = self.generate(query, docs)

        # Calculate confidence from similarity scores
        avg_score = sum(d["score"] for d in docs) / len(docs)

        sources = [
            {"id": d["id"], "score": d["score"],
             "text": d["metadata"].get("text", "")[:100]}
            for d in docs
        ]

        return RAGResponse(
            answer=answer,
            sources=sources,
            confidence=avg_score,
        )

# ตัวอย่าง
# rag = RAGPipeline("pinecone_key", "openai_key", "knowledge-base")
# result = rag.ask("How to set up monitoring for Kubernetes?")
# print(f"Answer: {result.answer}")
# print(f"Confidence: {result.confidence:.2f}")
# print(f"Sources: {len(result.sources)}")

Chaos Engineering สำหรับ Vector Search

# chaos_vector_db.py — Chaos Engineering สำหรับ Vector Database
import time
import random
import threading
import statistics
from dataclasses import dataclass
from typing import Callable
import requests

@dataclass
class ChaosResult:
    experiment: str
    duration: float
    success_rate: float
    avg_latency: float
    p99_latency: float
    errors: list

class VectorDBChaosEngine:
    """Chaos Engineering สำหรับ Vector Database"""

    def __init__(self, rag_pipeline, steady_state_latency=0.5):
        self.rag = rag_pipeline
        self.steady_state = steady_state_latency
        self.results = []

    def run_experiment(self, name, chaos_func, duration=60,
                       queries_per_sec=10):
        """รัน Chaos Experiment"""
        print(f"\n{'='*50}")
        print(f"Experiment: {name}")
        print(f"Duration: {duration}s | QPS: {queries_per_sec}")
        print(f"{'='*50}")

        latencies = []
        errors = []
        success = 0
        total = 0
        stop_event = threading.Event()

        # เริ่ม Chaos
        chaos_thread = threading.Thread(target=chaos_func,
                                         args=(stop_event,))
        chaos_thread.start()

        # ส่ง Queries
        test_queries = [
            "How to deploy containers?",
            "What is database replication?",
            "How to set up monitoring?",
            "Explain load balancing",
            "What is service mesh?",
        ]

        start_time = time.time()
        while time.time() - start_time < duration:
            query = random.choice(test_queries)
            total += 1

            try:
                q_start = time.time()
                result = self.rag.ask(query, top_k=3)
                latency = time.time() - q_start
                latencies.append(latency)

                if result.confidence > 0.3:
                    success += 1
                else:
                    errors.append(f"Low confidence: {result.confidence:.2f}")
            except Exception as e:
                errors.append(str(e))
                latencies.append(float("inf"))

            time.sleep(1.0 / queries_per_sec)

        # หยุด Chaos
        stop_event.set()
        chaos_thread.join()

        # วิเคราะห์ผล
        valid_latencies = [l for l in latencies if l != float("inf")]
        avg_lat = statistics.mean(valid_latencies) if valid_latencies else 0
        p99_lat = (sorted(valid_latencies)[int(len(valid_latencies) * 0.99)]
                   if valid_latencies else 0)

        result = ChaosResult(
            experiment=name,
            duration=duration,
            success_rate=success / total * 100 if total > 0 else 0,
            avg_latency=avg_lat,
            p99_latency=p99_lat,
            errors=errors[:10],
        )
        self.results.append(result)

        # แสดงผล
        print(f"\nResults:")
        print(f"  Success Rate: {result.success_rate:.1f}%")
        print(f"  Avg Latency:  {result.avg_latency:.3f}s")
        print(f"  P99 Latency:  {result.p99_latency:.3f}s")
        print(f"  Errors:       {len(errors)}")
        print(f"  Steady State: {'PASS' if avg_lat < self.steady_state * 2 else 'FAIL'}")

        return result

    # === Chaos Experiments ===

    def network_latency(self, stop_event, delay_ms=500):
        """จำลอง Network Latency"""
        import socket
        original_getaddrinfo = socket.getaddrinfo

        def slow_getaddrinfo(*args, **kwargs):
            time.sleep(delay_ms / 1000)
            return original_getaddrinfo(*args, **kwargs)

        socket.getaddrinfo = slow_getaddrinfo
        stop_event.wait()
        socket.getaddrinfo = original_getaddrinfo

    def connection_drop(self, stop_event, drop_rate=0.3):
        """จำลอง Connection Drop"""
        # Monkey-patch requests
        original_send = requests.Session.send

        def flaky_send(self_session, *args, **kwargs):
            if random.random() < drop_rate:
                raise requests.ConnectionError("Chaos: Connection dropped")
            return original_send(self_session, *args, **kwargs)

        requests.Session.send = flaky_send
        stop_event.wait()
        requests.Session.send = original_send

    def run_all_experiments(self):
        """รันทุก Experiment"""
        experiments = [
            ("Network Latency 500ms",
             lambda e: self.network_latency(e, 500), 30),
            ("Connection Drop 30%",
             lambda e: self.connection_drop(e, 0.3), 30),
        ]

        for name, func, duration in experiments:
            self.run_experiment(name, func, duration)

        # Summary
        print(f"\n{'='*60}")
        print("Chaos Engineering Summary")
        print(f"{'='*60}")
        for r in self.results:
            status = "PASS" if r.success_rate > 90 else "FAIL"
            print(f"  [{status}] {r.experiment}: "
                  f"SR={r.success_rate:.0f}% "
                  f"Lat={r.avg_latency:.3f}s")

# chaos = VectorDBChaosEngine(rag_pipeline)
# chaos.run_all_experiments()

Best Practices

  • Namespace Isolation: ใช้ Namespace แยก Production, Staging, Testing ไม่ให้ข้อมูลปนกัน
  • Batch Operations: Upsert เป็น Batch (100-1000 vectors) แทนทีละตัว ลด API Calls
  • Metadata Filtering: เก็บ Metadata ที่จำเป็นสำหรับ Filter ลดจำนวน Vectors ที่ต้องค้น
  • Fallback Strategy: มี Cache Layer (Redis) สำหรับ Queries ที่ถามบ่อย ถ้า Pinecone ล่มใช้ Cache
  • Rate Limiting: ตั้ง Rate Limit สำหรับ Client ป้องกัน Overload
  • Circuit Breaker: ใช้ Circuit Breaker Pattern ตัดการเรียก Pinecone เมื่อ Error Rate สูง
  • Monitoring: ติดตาม Query Latency, Error Rate, Index Size ตั้ง Alert เมื่อผิดปกติ

Vector Database คืออะไร

ฐานข้อมูลเฉพาะทางสำหรับเก็บและค้นหา Vector Embeddings ใช้ ANN Algorithm เช่น HNSW ค้นหาใน Milliseconds สำหรับ Similarity Search, Semantic Search, Recommendation และ RAG สำหรับ LLM