it

Vector Database Pinecone กับ Chaos Engineering —

Vector Database Pinecone กับ Chaos Engineering —

Vector Database และ Pinecone

Vector Database Pinecone กับ Chaos Engineering —

Vector Database เป็นฐานข้อมูลเฉพาะทางที่ออกแบบมาสำหรับเก็บและค้นหา Vector Embeddings ซึ่งเป็นตัวแทนข้อมูล (Text, Image, Audio) ในรูปแบบตัวเลขหลายมิติ การค้นหาใช้ Approximate Nearest Neighbor (ANN) Algorithm เช่น HNSW ทำให้ค้นหาใน Milliseconds แม้มีข้อมูลหลายล้าน Vectors

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: why is it important to match supply and demand

Pinecone เป็น Managed Vector Database ที่นิยมที่สุด ให้บริการแบบ Serverless ไม่ต้องจัดการ Infrastructure รองรับ Real-time Indexing, Metadata Filtering, Namespace Isolation และ Scale อัตโนมัติ เหมาะสำหรับ Production AI Applications

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง คอมมชชนคืออะไร — ทุกสิ่งที่ต้องรู้ในปี 2026

ใช้งาน Pinecone

# === Pinecone Setup และ CRUD Operations ===
# pip install pinecone-client openai

from pinecone import Pinecone, ServerlessSpec
import openai
import json
import time

# Initialize Pinecone
pc = Pinecone(api_key="YOUR_PINECONE_API_KEY")

# สร้าง Index
index_name = "knowledge-base"

if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=1536,          # OpenAI text-embedding-3-small
        metric="cosine",         # cosine, euclidean, dotproduct
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1",
        ),
    )
    print(f"Index '{index_name}' created")

# เชื่อมต่อ Index
index = pc.Index(index_name)

# === Embedding Function ===
client = openai.OpenAI(api_key="YOUR_OPENAI_KEY")

def get_embedding(text, model="text-embedding-3-small"):
    """สร้าง Embedding จากข้อความ"""
    response = client.embeddings.create(input=text, model=model)
    return response.data[0].embedding

# === Upsert Vectors ===
documents = [
    {"id": "doc-001", "text": "Kubernetes is a container orchestration platform",
     "metadata": {"category": "devops", "source": "docs"}},
    {"id": "doc-002", "text": "Docker containers package applications with dependencies",
     "metadata": {"category": "devops", "source": "blog"}},
    {"id": "doc-003", "text": "PostgreSQL is a relational database management system",
     "metadata": {"category": "database", "source": "docs"}},
    {"id": "doc-004", "text": "Redis is an in-memory data structure store",
     "metadata": {"category": "database", "source": "docs"}},
    {"id": "doc-005", "text": "Prometheus monitors metrics from containerized applications",
     "metadata": {"category": "monitoring", "source": "blog"}},
]

# Batch Upsert
vectors = []
for doc in documents:
    embedding = get_embedding(doc["text"])
    vectors.append({
        "id": doc["id"],
        "values": embedding,
        "metadata": {**doc["metadata"], "text": doc["text"]},
    })

index.upsert(vectors=vectors, namespace="production")
print(f"Upserted {len(vectors)} vectors")

# === Query (Similarity Search) ===
query_text = "How to monitor containers?"
query_embedding = get_embedding(query_text)

results = index.query(
    vector=query_embedding,
    top_k=3,
    include_metadata=True,
    namespace="production",
    filter={"category": {"$in": ["monitoring", "devops"]}},
)

print(f"\nQuery: '{query_text}'")
for match in results["matches"]:
    print(f"  Score: {match['score']:.4f} | {match['metadata']['text']}")

# === Index Stats ===
stats = index.describe_index_stats()
print(f"\nIndex Stats:")
print(f"  Total Vectors: {stats['total_vector_count']}")
print(f"  Namespaces: {stats['namespaces']}")

# === Delete ===
# index.delete(ids=["doc-001"], namespace="production")
# index.delete(delete_all=True, namespace="staging")

RAG Pipeline ด้วย Pinecone

Vector Database Pinecone กับ Chaos Engineering —
# rag_pipeline.py — Retrieval-Augmented Generation
from pinecone import Pinecone
import openai
from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class RAGResponse:
    answer: str
    sources: list
    confidence: float

class RAGPipeline:
    """RAG Pipeline ด้วย Pinecone + OpenAI"""

    def __init__(self, pinecone_key, openai_key, index_name):
        self.pc = Pinecone(api_key=pinecone_key)
        self.index = self.pc.Index(index_name)
        self.openai = openai.OpenAI(api_key=openai_key)

    def embed(self, text):
        res = self.openai.embeddings.create(
            input=text, model="text-embedding-3-small"
        )
        return res.data[0].embedding

    def retrieve(self, query, top_k=5, namespace="production",
                 filter_dict=None):
        """ค้นหาเอกสารที่เกี่ยวข้อง"""
        embedding = self.embed(query)
        results = self.index.query(
            vector=embedding,
            top_k=top_k,
            include_metadata=True,
            namespace=namespace,
            filter=filter_dict,
        )
        return results["matches"]

    def generate(self, query, context_docs, model="gpt-4o-mini"):
        """สร้างคำตอบจาก Context"""
        context = "\n\n".join([
            f"[Source: {d['metadata'].get('source', 'unknown')}]\n"
            f"{d['metadata'].get('text', '')}"
            for d in context_docs
        ])

        response = self.openai.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content":
                 "Answer based on the provided context. "
                 "If the context doesn't contain the answer, say so."},
                {"role": "user", "content":
                 f"Context:\n{context}\n\nQuestion: {query}"},
            ],
            temperature=0.3,
        )

        return response.choices[0].message.content

    def ask(self, query, top_k=5, namespace="production") -> RAGResponse:
        """End-to-end RAG Query"""
        # Retrieve
        docs = self.retrieve(query, top_k=top_k, namespace=namespace)

        if not docs:
            return RAGResponse(
                answer="No relevant documents found.",
                sources=[], confidence=0.0,
            )

        # Generate
        answer = self.generate(query, docs)

        # Calculate confidence from similarity scores
        avg_score = sum(d["score"] for d in docs) / len(docs)

        sources = [
            {"id": d["id"], "score": d["score"],
             "text": d["metadata"].get("text", "")[:100]}
            for d in docs
        ]

        return RAGResponse(
            answer=answer,
            sources=sources,
            confidence=avg_score,
        )

# ตัวอย่าง
# rag = RAGPipeline("pinecone_key", "openai_key", "knowledge-base")
# result = rag.ask("How to set up monitoring for Kubernetes?")
# print(f"Answer: {result.answer}")
# print(f"Confidence: {result.confidence:.2f}")
# print(f"Sources: {len(result.sources)}")

Chaos Engineering สำหรับ Vector Search

# chaos_vector_db.py — Chaos Engineering สำหรับ Vector Database
import time
import random
import threading
import statistics
from dataclasses import dataclass
from typing import Callable
import requests

@dataclass
class ChaosResult:
    experiment: str
    duration: float
    success_rate: float
    avg_latency: float
    p99_latency: float
    errors: list

class VectorDBChaosEngine:
    """Chaos Engineering สำหรับ Vector Database"""

    def __init__(self, rag_pipeline, steady_state_latency=0.5):
        self.rag = rag_pipeline
        self.steady_state = steady_state_latency
        self.results = []

    def run_experiment(self, name, chaos_func, duration=60,
                       queries_per_sec=10):
        """รัน Chaos Experiment"""
        print(f"\n{'='*50}")
        print(f"Experiment: {name}")
        print(f"Duration: {duration}s | QPS: {queries_per_sec}")
        print(f"{'='*50}")

        latencies = []
        errors = []
        success = 0
        total = 0
        stop_event = threading.Event()

        # เริ่ม Chaos
        chaos_thread = threading.Thread(target=chaos_func,
                                         args=(stop_event,))
        chaos_thread.start()

        # ส่ง Queries
        test_queries = [
            "How to deploy containers?",
            "What is database replication?",
            "How to set up monitoring?",
            "Explain load balancing",
            "What is service mesh?",
        ]

        start_time = time.time()
        while time.time() - start_time < duration:
            query = random.choice(test_queries)
            total += 1

            try:
                q_start = time.time()
                result = self.rag.ask(query, top_k=3)
                latency = time.time() - q_start
                latencies.append(latency)

                if result.confidence > 0.3:
                    success += 1
                else:
                    errors.append(f"Low confidence: {result.confidence:.2f}")
            except Exception as e:
                errors.append(str(e))
                latencies.append(float("inf"))

            time.sleep(1.0 / queries_per_sec)

        # หยุด Chaos
        stop_event.set()
        chaos_thread.join()

        # วิเคราะห์ผล
        valid_latencies = [l for l in latencies if l != float("inf")]
        avg_lat = statistics.mean(valid_latencies) if valid_latencies else 0
        p99_lat = (sorted(valid_latencies)[int(len(valid_latencies) * 0.99)]
                   if valid_latencies else 0)

        result = ChaosResult(
            experiment=name,
            duration=duration,
            success_rate=success / total * 100 if total > 0 else 0,
            avg_latency=avg_lat,
            p99_latency=p99_lat,
            errors=errors[:10],
        )
        self.results.append(result)

        # แสดงผล
        print(f"\nResults:")
        print(f"  Success Rate: {result.success_rate:.1f}%")
        print(f"  Avg Latency:  {result.avg_latency:.3f}s")
        print(f"  P99 Latency:  {result.p99_latency:.3f}s")
        print(f"  Errors:       {len(errors)}")
        print(f"  Steady State: {'PASS' if avg_lat < self.steady_state * 2 else 'FAIL'}")

        return result

    # === Chaos Experiments ===

    def network_latency(self, stop_event, delay_ms=500):
        """จำลอง Network Latency"""
        import socket
        original_getaddrinfo = socket.getaddrinfo

        def slow_getaddrinfo(*args, **kwargs):
            time.sleep(delay_ms / 1000)
            return original_getaddrinfo(*args, **kwargs)

        socket.getaddrinfo = slow_getaddrinfo
        stop_event.wait()
        socket.getaddrinfo = original_getaddrinfo

    def connection_drop(self, stop_event, drop_rate=0.3):
        """จำลอง Connection Drop"""
        # Monkey-patch requests
        original_send = requests.Session.send

        def flaky_send(self_session, *args, **kwargs):
            if random.random() < drop_rate:
                raise requests.ConnectionError("Chaos: Connection dropped")
            return original_send(self_session, *args, **kwargs)

        requests.Session.send = flaky_send
        stop_event.wait()
        requests.Session.send = original_send

    def run_all_experiments(self):
        """รันทุก Experiment"""
        experiments = [
            ("Network Latency 500ms",
             lambda e: self.network_latency(e, 500), 30),
            ("Connection Drop 30%",
             lambda e: self.connection_drop(e, 0.3), 30),
        ]

        for name, func, duration in experiments:
            self.run_experiment(name, func, duration)

        # Summary
        print(f"\n{'='*60}")
        print("Chaos Engineering Summary")
        print(f"{'='*60}")
        for r in self.results:
            status = "PASS" if r.success_rate > 90 else "FAIL"
            print(f"  [{status}] {r.experiment}: "
                  f"SR={r.success_rate:.0f}% "
                  f"Lat={r.avg_latency:.3f}s")

# chaos = VectorDBChaosEngine(rag_pipeline)
# chaos.run_all_experiments()

Best Practices

  • Namespace Isolation: ใช้ Namespace แยก Production, Staging, Testing ไม่ให้ข้อมูลปนกัน
  • Batch Operations: Upsert เป็น Batch (100-1000 vectors) แทนทีละตัว ลด API Calls
  • Metadata Filtering: เก็บ Metadata ที่จำเป็นสำหรับ Filter ลดจำนวน Vectors ที่ต้องค้น
  • Fallback Strategy: มี Cache Layer (Redis) สำหรับ Queries ที่ถามบ่อย ถ้า Pinecone ล่มใช้ Cache
  • Rate Limiting: ตั้ง Rate Limit สำหรับ Client ป้องกัน Overload
  • Circuit Breaker: ใช้ Circuit Breaker Pattern ตัดการเรียก Pinecone เมื่อ Error Rate สูง
  • Monitoring: ติดตาม Query Latency, Error Rate, Index Size ตั้ง Alert เมื่อผิดปกติ

Vector Database คืออะไร

ฐานข้อมูลเฉพาะทางสำหรับเก็บและค้นหา Vector Embeddings ใช้ ANN Algorithm เช่น HNSW ค้นหาใน Milliseconds สำหรับ Similarity Search, Semantic Search, Recommendation และ RAG สำหรับ LLM

แนะนำเพิ่มเติม — SiamCafeBook

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Container Security Trivy GitOps Workflow

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง