Vector Database และ Pinecone
Vector Database เป็นฐานข้อมูลเฉพาะทางที่ออกแบบมาสำหรับเก็บและค้นหา Vector Embeddings ซึ่งเป็นตัวแทนข้อมูล (Text, Image, Audio) ในรูปแบบตัวเลขหลายมิติ การค้นหาใช้ Approximate Nearest Neighbor (ANN) Algorithm เช่น HNSW ทำให้ค้นหาใน Milliseconds แม้มีข้อมูลหลายล้าน Vectors
Pinecone เป็น Managed Vector Database ที่นิยมที่สุด ให้บริการแบบ Serverless ไม่ต้องจัดการ Infrastructure รองรับ Real-time Indexing, Metadata Filtering, Namespace Isolation และ Scale อัตโนมัติ เหมาะสำหรับ Production AI Applications
ใช้งาน Pinecone
# === Pinecone Setup และ CRUD Operations ===
# pip install pinecone-client openai
from pinecone import Pinecone, ServerlessSpec
import openai
import json
import time
# Initialize Pinecone
pc = Pinecone(api_key="YOUR_PINECONE_API_KEY")
# สร้าง Index
index_name = "knowledge-base"
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=1536, # OpenAI text-embedding-3-small
metric="cosine", # cosine, euclidean, dotproduct
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
),
)
print(f"Index '{index_name}' created")
# เชื่อมต่อ Index
index = pc.Index(index_name)
# === Embedding Function ===
client = openai.OpenAI(api_key="YOUR_OPENAI_KEY")
def get_embedding(text, model="text-embedding-3-small"):
"""สร้าง Embedding จากข้อความ"""
response = client.embeddings.create(input=text, model=model)
return response.data[0].embedding
# === Upsert Vectors ===
documents = [
{"id": "doc-001", "text": "Kubernetes is a container orchestration platform",
"metadata": {"category": "devops", "source": "docs"}},
{"id": "doc-002", "text": "Docker containers package applications with dependencies",
"metadata": {"category": "devops", "source": "blog"}},
{"id": "doc-003", "text": "PostgreSQL is a relational database management system",
"metadata": {"category": "database", "source": "docs"}},
{"id": "doc-004", "text": "Redis is an in-memory data structure store",
"metadata": {"category": "database", "source": "docs"}},
{"id": "doc-005", "text": "Prometheus monitors metrics from containerized applications",
"metadata": {"category": "monitoring", "source": "blog"}},
]
# Batch Upsert
vectors = []
for doc in documents:
embedding = get_embedding(doc["text"])
vectors.append({
"id": doc["id"],
"values": embedding,
"metadata": {**doc["metadata"], "text": doc["text"]},
})
index.upsert(vectors=vectors, namespace="production")
print(f"Upserted {len(vectors)} vectors")
# === Query (Similarity Search) ===
query_text = "How to monitor containers?"
query_embedding = get_embedding(query_text)
results = index.query(
vector=query_embedding,
top_k=3,
include_metadata=True,
namespace="production",
filter={"category": {"$in": ["monitoring", "devops"]}},
)
print(f"\nQuery: '{query_text}'")
for match in results["matches"]:
print(f" Score: {match['score']:.4f} | {match['metadata']['text']}")
# === Index Stats ===
stats = index.describe_index_stats()
print(f"\nIndex Stats:")
print(f" Total Vectors: {stats['total_vector_count']}")
print(f" Namespaces: {stats['namespaces']}")
# === Delete ===
# index.delete(ids=["doc-001"], namespace="production")
# index.delete(delete_all=True, namespace="staging")
RAG Pipeline ด้วย Pinecone
# rag_pipeline.py — Retrieval-Augmented Generation
from pinecone import Pinecone
import openai
from dataclasses import dataclass
from typing import Optional
import hashlib
@dataclass
class RAGResponse:
answer: str
sources: list
confidence: float
class RAGPipeline:
"""RAG Pipeline ด้วย Pinecone + OpenAI"""
def __init__(self, pinecone_key, openai_key, index_name):
self.pc = Pinecone(api_key=pinecone_key)
self.index = self.pc.Index(index_name)
self.openai = openai.OpenAI(api_key=openai_key)
def embed(self, text):
res = self.openai.embeddings.create(
input=text, model="text-embedding-3-small"
)
return res.data[0].embedding
def retrieve(self, query, top_k=5, namespace="production",
filter_dict=None):
"""ค้นหาเอกสารที่เกี่ยวข้อง"""
embedding = self.embed(query)
results = self.index.query(
vector=embedding,
top_k=top_k,
include_metadata=True,
namespace=namespace,
filter=filter_dict,
)
return results["matches"]
def generate(self, query, context_docs, model="gpt-4o-mini"):
"""สร้างคำตอบจาก Context"""
context = "\n\n".join([
f"[Source: {d['metadata'].get('source', 'unknown')}]\n"
f"{d['metadata'].get('text', '')}"
for d in context_docs
])
response = self.openai.chat.completions.create(
model=model,
messages=[
{"role": "system", "content":
"Answer based on the provided context. "
"If the context doesn't contain the answer, say so."},
{"role": "user", "content":
f"Context:\n{context}\n\nQuestion: {query}"},
],
temperature=0.3,
)
return response.choices[0].message.content
def ask(self, query, top_k=5, namespace="production") -> RAGResponse:
"""End-to-end RAG Query"""
# Retrieve
docs = self.retrieve(query, top_k=top_k, namespace=namespace)
if not docs:
return RAGResponse(
answer="No relevant documents found.",
sources=[], confidence=0.0,
)
# Generate
answer = self.generate(query, docs)
# Calculate confidence from similarity scores
avg_score = sum(d["score"] for d in docs) / len(docs)
sources = [
{"id": d["id"], "score": d["score"],
"text": d["metadata"].get("text", "")[:100]}
for d in docs
]
return RAGResponse(
answer=answer,
sources=sources,
confidence=avg_score,
)
# ตัวอย่าง
# rag = RAGPipeline("pinecone_key", "openai_key", "knowledge-base")
# result = rag.ask("How to set up monitoring for Kubernetes?")
# print(f"Answer: {result.answer}")
# print(f"Confidence: {result.confidence:.2f}")
# print(f"Sources: {len(result.sources)}")
Chaos Engineering สำหรับ Vector Search
# chaos_vector_db.py — Chaos Engineering สำหรับ Vector Database
import time
import random
import threading
import statistics
from dataclasses import dataclass
from typing import Callable
import requests
@dataclass
class ChaosResult:
experiment: str
duration: float
success_rate: float
avg_latency: float
p99_latency: float
errors: list
class VectorDBChaosEngine:
"""Chaos Engineering สำหรับ Vector Database"""
def __init__(self, rag_pipeline, steady_state_latency=0.5):
self.rag = rag_pipeline
self.steady_state = steady_state_latency
self.results = []
def run_experiment(self, name, chaos_func, duration=60,
queries_per_sec=10):
"""รัน Chaos Experiment"""
print(f"\n{'='*50}")
print(f"Experiment: {name}")
print(f"Duration: {duration}s | QPS: {queries_per_sec}")
print(f"{'='*50}")
latencies = []
errors = []
success = 0
total = 0
stop_event = threading.Event()
# เริ่ม Chaos
chaos_thread = threading.Thread(target=chaos_func,
args=(stop_event,))
chaos_thread.start()
# ส่ง Queries
test_queries = [
"How to deploy containers?",
"What is database replication?",
"How to set up monitoring?",
"Explain load balancing",
"What is service mesh?",
]
start_time = time.time()
while time.time() - start_time < duration:
query = random.choice(test_queries)
total += 1
try:
q_start = time.time()
result = self.rag.ask(query, top_k=3)
latency = time.time() - q_start
latencies.append(latency)
if result.confidence > 0.3:
success += 1
else:
errors.append(f"Low confidence: {result.confidence:.2f}")
except Exception as e:
errors.append(str(e))
latencies.append(float("inf"))
time.sleep(1.0 / queries_per_sec)
# หยุด Chaos
stop_event.set()
chaos_thread.join()
# วิเคราะห์ผล
valid_latencies = [l for l in latencies if l != float("inf")]
avg_lat = statistics.mean(valid_latencies) if valid_latencies else 0
p99_lat = (sorted(valid_latencies)[int(len(valid_latencies) * 0.99)]
if valid_latencies else 0)
result = ChaosResult(
experiment=name,
duration=duration,
success_rate=success / total * 100 if total > 0 else 0,
avg_latency=avg_lat,
p99_latency=p99_lat,
errors=errors[:10],
)
self.results.append(result)
# แสดงผล
print(f"\nResults:")
print(f" Success Rate: {result.success_rate:.1f}%")
print(f" Avg Latency: {result.avg_latency:.3f}s")
print(f" P99 Latency: {result.p99_latency:.3f}s")
print(f" Errors: {len(errors)}")
print(f" Steady State: {'PASS' if avg_lat < self.steady_state * 2 else 'FAIL'}")
return result
# === Chaos Experiments ===
def network_latency(self, stop_event, delay_ms=500):
"""จำลอง Network Latency"""
import socket
original_getaddrinfo = socket.getaddrinfo
def slow_getaddrinfo(*args, **kwargs):
time.sleep(delay_ms / 1000)
return original_getaddrinfo(*args, **kwargs)
socket.getaddrinfo = slow_getaddrinfo
stop_event.wait()
socket.getaddrinfo = original_getaddrinfo
def connection_drop(self, stop_event, drop_rate=0.3):
"""จำลอง Connection Drop"""
# Monkey-patch requests
original_send = requests.Session.send
def flaky_send(self_session, *args, **kwargs):
if random.random() < drop_rate:
raise requests.ConnectionError("Chaos: Connection dropped")
return original_send(self_session, *args, **kwargs)
requests.Session.send = flaky_send
stop_event.wait()
requests.Session.send = original_send
def run_all_experiments(self):
"""รันทุก Experiment"""
experiments = [
("Network Latency 500ms",
lambda e: self.network_latency(e, 500), 30),
("Connection Drop 30%",
lambda e: self.connection_drop(e, 0.3), 30),
]
for name, func, duration in experiments:
self.run_experiment(name, func, duration)
# Summary
print(f"\n{'='*60}")
print("Chaos Engineering Summary")
print(f"{'='*60}")
for r in self.results:
status = "PASS" if r.success_rate > 90 else "FAIL"
print(f" [{status}] {r.experiment}: "
f"SR={r.success_rate:.0f}% "
f"Lat={r.avg_latency:.3f}s")
# chaos = VectorDBChaosEngine(rag_pipeline)
# chaos.run_all_experiments()
Best Practices
- Namespace Isolation: ใช้ Namespace แยก Production, Staging, Testing ไม่ให้ข้อมูลปนกัน
- Batch Operations: Upsert เป็น Batch (100-1000 vectors) แทนทีละตัว ลด API Calls
- Metadata Filtering: เก็บ Metadata ที่จำเป็นสำหรับ Filter ลดจำนวน Vectors ที่ต้องค้น
- Fallback Strategy: มี Cache Layer (Redis) สำหรับ Queries ที่ถามบ่อย ถ้า Pinecone ล่มใช้ Cache
- Rate Limiting: ตั้ง Rate Limit สำหรับ Client ป้องกัน Overload
- Circuit Breaker: ใช้ Circuit Breaker Pattern ตัดการเรียก Pinecone เมื่อ Error Rate สูง
- Monitoring: ติดตาม Query Latency, Error Rate, Index Size ตั้ง Alert เมื่อผิดปกติ
Vector Database คืออะไร
ฐานข้อมูลเฉพาะทางสำหรับเก็บและค้นหา Vector Embeddings ใช้ ANN Algorithm เช่น HNSW ค้นหาใน Milliseconds สำหรับ Similarity Search, Semantic Search, Recommendation และ RAG สำหรับ LLM
Pinecone คืออะไร
Managed Vector Database แบบ Cloud Native ไม่ต้องจัดการ Infrastructure รองรับ Real-time Indexing, Metadata Filtering, Namespace Isolation, Hybrid Search ใช้งานผ่าน REST API หรือ SDK
Chaos Engineering คืออะไร
ทดสอบความทนทานของระบบโดยจำลองสถานการณ์ล้มเหลว เช่น Network Latency, Service Down ค้นหาจุดอ่อนก่อนเกิดจริง ใช้เครื่องมือ เช่น Chaos Monkey, Litmus, Gremlin
ทำไมต้องทำ Chaos Engineering กับ Vector Database
Vector Search เป็น Critical Path ของ AI Applications ถ้า Vector DB ล่มหรือช้า AI ตอบผิดหรือไม่ตอบ Chaos Engineering ทดสอบว่าระบบ Handle ได้เมื่อเกิดปัญหา Network, Rate Limit หรือ Index Corrupt
สรุป
Pinecone เป็น Vector Database ที่เหมาะสำหรับ Production AI Applications ใช้ง่าย Scale อัตโนมัติ รองรับ Metadata Filtering และ Namespace Isolation เมื่อรวมกับ Chaos Engineering ทำให้มั่นใจว่าระบบ Vector Search ทนทานต่อ Failure สิ่งสำคัญคือ Fallback Strategy, Circuit Breaker, Monitoring และทดสอบ Chaos อย่างสม่ำเสมอ
