SiamCafe.net Blog
Technology

Vector Database Pinecone Chaos Engineering

vector database pinecone chaos engineering
Vector Database Pinecone Chaos Engineering | SiamCafe Blog
2025-08-31· อ. บอม — SiamCafe.net· 11,052 คำ

Vector Database และ Pinecone

Vector Database เป็นฐานข้อมูลเฉพาะทางที่ออกแบบมาสำหรับเก็บและค้นหา Vector Embeddings ซึ่งเป็นตัวแทนข้อมูล (Text, Image, Audio) ในรูปแบบตัวเลขหลายมิติ การค้นหาใช้ Approximate Nearest Neighbor (ANN) Algorithm เช่น HNSW ทำให้ค้นหาใน Milliseconds แม้มีข้อมูลหลายล้าน Vectors

Pinecone เป็น Managed Vector Database ที่นิยมที่สุด ให้บริการแบบ Serverless ไม่ต้องจัดการ Infrastructure รองรับ Real-time Indexing, Metadata Filtering, Namespace Isolation และ Scale อัตโนมัติ เหมาะสำหรับ Production AI Applications

ใช้งาน Pinecone

# === Pinecone Setup และ CRUD Operations ===
# pip install pinecone-client openai

from pinecone import Pinecone, ServerlessSpec
import openai
import json
import time

# Initialize Pinecone
pc = Pinecone(api_key="YOUR_PINECONE_API_KEY")

# สร้าง Index
index_name = "knowledge-base"

if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=1536,          # OpenAI text-embedding-3-small
        metric="cosine",         # cosine, euclidean, dotproduct
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1",
        ),
    )
    print(f"Index '{index_name}' created")

# เชื่อมต่อ Index
index = pc.Index(index_name)

# === Embedding Function ===
client = openai.OpenAI(api_key="YOUR_OPENAI_KEY")

def get_embedding(text, model="text-embedding-3-small"):
    """สร้าง Embedding จากข้อความ"""
    response = client.embeddings.create(input=text, model=model)
    return response.data[0].embedding

# === Upsert Vectors ===
documents = [
    {"id": "doc-001", "text": "Kubernetes is a container orchestration platform",
     "metadata": {"category": "devops", "source": "docs"}},
    {"id": "doc-002", "text": "Docker containers package applications with dependencies",
     "metadata": {"category": "devops", "source": "blog"}},
    {"id": "doc-003", "text": "PostgreSQL is a relational database management system",
     "metadata": {"category": "database", "source": "docs"}},
    {"id": "doc-004", "text": "Redis is an in-memory data structure store",
     "metadata": {"category": "database", "source": "docs"}},
    {"id": "doc-005", "text": "Prometheus monitors metrics from containerized applications",
     "metadata": {"category": "monitoring", "source": "blog"}},
]

# Batch Upsert
vectors = []
for doc in documents:
    embedding = get_embedding(doc["text"])
    vectors.append({
        "id": doc["id"],
        "values": embedding,
        "metadata": {**doc["metadata"], "text": doc["text"]},
    })

index.upsert(vectors=vectors, namespace="production")
print(f"Upserted {len(vectors)} vectors")

# === Query (Similarity Search) ===
query_text = "How to monitor containers?"
query_embedding = get_embedding(query_text)

results = index.query(
    vector=query_embedding,
    top_k=3,
    include_metadata=True,
    namespace="production",
    filter={"category": {"$in": ["monitoring", "devops"]}},
)

print(f"\nQuery: '{query_text}'")
for match in results["matches"]:
    print(f"  Score: {match['score']:.4f} | {match['metadata']['text']}")

# === Index Stats ===
stats = index.describe_index_stats()
print(f"\nIndex Stats:")
print(f"  Total Vectors: {stats['total_vector_count']}")
print(f"  Namespaces: {stats['namespaces']}")

# === Delete ===
# index.delete(ids=["doc-001"], namespace="production")
# index.delete(delete_all=True, namespace="staging")

RAG Pipeline ด้วย Pinecone

# rag_pipeline.py — Retrieval-Augmented Generation
from pinecone import Pinecone
import openai
from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class RAGResponse:
    answer: str
    sources: list
    confidence: float

class RAGPipeline:
    """RAG Pipeline ด้วย Pinecone + OpenAI"""

    def __init__(self, pinecone_key, openai_key, index_name):
        self.pc = Pinecone(api_key=pinecone_key)
        self.index = self.pc.Index(index_name)
        self.openai = openai.OpenAI(api_key=openai_key)

    def embed(self, text):
        res = self.openai.embeddings.create(
            input=text, model="text-embedding-3-small"
        )
        return res.data[0].embedding

    def retrieve(self, query, top_k=5, namespace="production",
                 filter_dict=None):
        """ค้นหาเอกสารที่เกี่ยวข้อง"""
        embedding = self.embed(query)
        results = self.index.query(
            vector=embedding,
            top_k=top_k,
            include_metadata=True,
            namespace=namespace,
            filter=filter_dict,
        )
        return results["matches"]

    def generate(self, query, context_docs, model="gpt-4o-mini"):
        """สร้างคำตอบจาก Context"""
        context = "\n\n".join([
            f"[Source: {d['metadata'].get('source', 'unknown')}]\n"
            f"{d['metadata'].get('text', '')}"
            for d in context_docs
        ])

        response = self.openai.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content":
                 "Answer based on the provided context. "
                 "If the context doesn't contain the answer, say so."},
                {"role": "user", "content":
                 f"Context:\n{context}\n\nQuestion: {query}"},
            ],
            temperature=0.3,
        )

        return response.choices[0].message.content

    def ask(self, query, top_k=5, namespace="production") -> RAGResponse:
        """End-to-end RAG Query"""
        # Retrieve
        docs = self.retrieve(query, top_k=top_k, namespace=namespace)

        if not docs:
            return RAGResponse(
                answer="No relevant documents found.",
                sources=[], confidence=0.0,
            )

        # Generate
        answer = self.generate(query, docs)

        # Calculate confidence from similarity scores
        avg_score = sum(d["score"] for d in docs) / len(docs)

        sources = [
            {"id": d["id"], "score": d["score"],
             "text": d["metadata"].get("text", "")[:100]}
            for d in docs
        ]

        return RAGResponse(
            answer=answer,
            sources=sources,
            confidence=avg_score,
        )

# ตัวอย่าง
# rag = RAGPipeline("pinecone_key", "openai_key", "knowledge-base")
# result = rag.ask("How to set up monitoring for Kubernetes?")
# print(f"Answer: {result.answer}")
# print(f"Confidence: {result.confidence:.2f}")
# print(f"Sources: {len(result.sources)}")

Chaos Engineering สำหรับ Vector Search

# chaos_vector_db.py — Chaos Engineering สำหรับ Vector Database
import time
import random
import threading
import statistics
from dataclasses import dataclass
from typing import Callable
import requests

@dataclass
class ChaosResult:
    experiment: str
    duration: float
    success_rate: float
    avg_latency: float
    p99_latency: float
    errors: list

class VectorDBChaosEngine:
    """Chaos Engineering สำหรับ Vector Database"""

    def __init__(self, rag_pipeline, steady_state_latency=0.5):
        self.rag = rag_pipeline
        self.steady_state = steady_state_latency
        self.results = []

    def run_experiment(self, name, chaos_func, duration=60,
                       queries_per_sec=10):
        """รัน Chaos Experiment"""
        print(f"\n{'='*50}")
        print(f"Experiment: {name}")
        print(f"Duration: {duration}s | QPS: {queries_per_sec}")
        print(f"{'='*50}")

        latencies = []
        errors = []
        success = 0
        total = 0
        stop_event = threading.Event()

        # เริ่ม Chaos
        chaos_thread = threading.Thread(target=chaos_func,
                                         args=(stop_event,))
        chaos_thread.start()

        # ส่ง Queries
        test_queries = [
            "How to deploy containers?",
            "What is database replication?",
            "How to set up monitoring?",
            "Explain load balancing",
            "What is service mesh?",
        ]

        start_time = time.time()
        while time.time() - start_time < duration:
            query = random.choice(test_queries)
            total += 1

            try:
                q_start = time.time()
                result = self.rag.ask(query, top_k=3)
                latency = time.time() - q_start
                latencies.append(latency)

                if result.confidence > 0.3:
                    success += 1
                else:
                    errors.append(f"Low confidence: {result.confidence:.2f}")
            except Exception as e:
                errors.append(str(e))
                latencies.append(float("inf"))

            time.sleep(1.0 / queries_per_sec)

        # หยุด Chaos
        stop_event.set()
        chaos_thread.join()

        # วิเคราะห์ผล
        valid_latencies = [l for l in latencies if l != float("inf")]
        avg_lat = statistics.mean(valid_latencies) if valid_latencies else 0
        p99_lat = (sorted(valid_latencies)[int(len(valid_latencies) * 0.99)]
                   if valid_latencies else 0)

        result = ChaosResult(
            experiment=name,
            duration=duration,
            success_rate=success / total * 100 if total > 0 else 0,
            avg_latency=avg_lat,
            p99_latency=p99_lat,
            errors=errors[:10],
        )
        self.results.append(result)

        # แสดงผล
        print(f"\nResults:")
        print(f"  Success Rate: {result.success_rate:.1f}%")
        print(f"  Avg Latency:  {result.avg_latency:.3f}s")
        print(f"  P99 Latency:  {result.p99_latency:.3f}s")
        print(f"  Errors:       {len(errors)}")
        print(f"  Steady State: {'PASS' if avg_lat < self.steady_state * 2 else 'FAIL'}")

        return result

    # === Chaos Experiments ===

    def network_latency(self, stop_event, delay_ms=500):
        """จำลอง Network Latency"""
        import socket
        original_getaddrinfo = socket.getaddrinfo

        def slow_getaddrinfo(*args, **kwargs):
            time.sleep(delay_ms / 1000)
            return original_getaddrinfo(*args, **kwargs)

        socket.getaddrinfo = slow_getaddrinfo
        stop_event.wait()
        socket.getaddrinfo = original_getaddrinfo

    def connection_drop(self, stop_event, drop_rate=0.3):
        """จำลอง Connection Drop"""
        # Monkey-patch requests
        original_send = requests.Session.send

        def flaky_send(self_session, *args, **kwargs):
            if random.random() < drop_rate:
                raise requests.ConnectionError("Chaos: Connection dropped")
            return original_send(self_session, *args, **kwargs)

        requests.Session.send = flaky_send
        stop_event.wait()
        requests.Session.send = original_send

    def run_all_experiments(self):
        """รันทุก Experiment"""
        experiments = [
            ("Network Latency 500ms",
             lambda e: self.network_latency(e, 500), 30),
            ("Connection Drop 30%",
             lambda e: self.connection_drop(e, 0.3), 30),
        ]

        for name, func, duration in experiments:
            self.run_experiment(name, func, duration)

        # Summary
        print(f"\n{'='*60}")
        print("Chaos Engineering Summary")
        print(f"{'='*60}")
        for r in self.results:
            status = "PASS" if r.success_rate > 90 else "FAIL"
            print(f"  [{status}] {r.experiment}: "
                  f"SR={r.success_rate:.0f}% "
                  f"Lat={r.avg_latency:.3f}s")

# chaos = VectorDBChaosEngine(rag_pipeline)
# chaos.run_all_experiments()

Best Practices

Vector Database คืออะไร

ฐานข้อมูลเฉพาะทางสำหรับเก็บและค้นหา Vector Embeddings ใช้ ANN Algorithm เช่น HNSW ค้นหาใน Milliseconds สำหรับ Similarity Search, Semantic Search, Recommendation และ RAG สำหรับ LLM

Pinecone คืออะไร

Managed Vector Database แบบ Cloud Native ไม่ต้องจัดการ Infrastructure รองรับ Real-time Indexing, Metadata Filtering, Namespace Isolation, Hybrid Search ใช้งานผ่าน REST API หรือ SDK

Chaos Engineering คืออะไร

ทดสอบความทนทานของระบบโดยจำลองสถานการณ์ล้มเหลว เช่น Network Latency, Service Down ค้นหาจุดอ่อนก่อนเกิดจริง ใช้เครื่องมือ เช่น Chaos Monkey, Litmus, Gremlin

ทำไมต้องทำ Chaos Engineering กับ Vector Database

Vector Search เป็น Critical Path ของ AI Applications ถ้า Vector DB ล่มหรือช้า AI ตอบผิดหรือไม่ตอบ Chaos Engineering ทดสอบว่าระบบ Handle ได้เมื่อเกิดปัญหา Network, Rate Limit หรือ Index Corrupt

สรุป

Pinecone เป็น Vector Database ที่เหมาะสำหรับ Production AI Applications ใช้ง่าย Scale อัตโนมัติ รองรับ Metadata Filtering และ Namespace Isolation เมื่อรวมกับ Chaos Engineering ทำให้มั่นใจว่าระบบ Vector Search ทนทานต่อ Failure สิ่งสำคัญคือ Fallback Strategy, Circuit Breaker, Monitoring และทดสอบ Chaos อย่างสม่ำเสมอ

📖 บทความที่เกี่ยวข้อง

Vector Database Pinecone Tech Conference 2026อ่านบทความ → Vector Database Pinecone Micro-segmentationอ่านบทความ → Ansible AWX Tower Chaos Engineeringอ่านบทความ → Vector Database Pinecone SSL TLS Certificateอ่านบทความ →

📚 ดูบทความทั้งหมด →