Vector Database Message Queue
Vector Database Pinecone Embeddings ANN Search Semantic Search RAG Message Queue Async Batch Processing Kafka RabbitMQ Production Architecture
| Vector DB | Type | Scale | จุดเด่น |
|---|---|---|---|
| Pinecone | Managed Cloud | Billions | Serverless ง่าย Fast |
| Weaviate | Open Source | Millions | GraphQL Modules |
| Qdrant | Open Source | Millions | Rust Performance |
| Milvus | Open Source | Billions | GPU Distributed |
| ChromaDB | Open Source | Thousands | Simple Embedded |
Pinecone Setup และ Embedding
# === Pinecone Vector Database ===
# pip install pinecone-client openai
# from pinecone import Pinecone, ServerlessSpec
# import openai
#
# # Initialize Pinecone
# pc = Pinecone(api_key="YOUR_API_KEY")
#
# # Create Index
# pc.create_index(
# name="knowledge-base",
# dimension=1536, # OpenAI text-embedding-3-small
# metric="cosine",
# spec=ServerlessSpec(cloud="aws", region="us-east-1"),
# )
#
# index = pc.Index("knowledge-base")
#
# # Generate Embeddings
# client = openai.OpenAI()
#
# def get_embedding(text):
# response = client.embeddings.create(
# model="text-embedding-3-small",
# input=text,
# )
# return response.data[0].embedding
#
# # Upsert Vectors
# documents = [
# {"id": "doc1", "text": "Python programming guide", "category": "tech"},
# {"id": "doc2", "text": "Machine learning basics", "category": "ml"},
# {"id": "doc3", "text": "Docker containerization", "category": "devops"},
# ]
#
# vectors = []
# for doc in documents:
# embedding = get_embedding(doc["text"])
# vectors.append({
# "id": doc["id"],
# "values": embedding,
# "metadata": {"text": doc["text"], "category": doc["category"]},
# })
#
# index.upsert(vectors=vectors, namespace="articles")
#
# # Query
# query_embedding = get_embedding("How to learn Python")
# results = index.query(
# vector=query_embedding,
# top_k=5,
# include_metadata=True,
# namespace="articles",
# filter={"category": {"$eq": "tech"}},
# )
#
# for match in results.matches:
# print(f" Score: {match.score:.3f} | {match.metadata['text']}")
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class VectorSearchResult:
doc_id: str
score: float
text: str
category: str
results_demo = [
VectorSearchResult("doc1", 0.95, "Python programming guide", "tech"),
VectorSearchResult("doc5", 0.89, "Python data analysis tutorial", "tech"),
VectorSearchResult("doc12", 0.85, "Introduction to coding", "tech"),
VectorSearchResult("doc8", 0.78, "Web development with Flask", "tech"),
VectorSearchResult("doc3", 0.72, "Docker for Python developers", "devops"),
]
print("=== Vector Search: 'How to learn Python' ===")
for r in results_demo:
print(f" [{r.score:.2f}] {r.doc_id} | {r.text} ({r.category})")
Message Queue Architecture
# === Message Queue + Vector DB Architecture ===
# Kafka Producer (API Server)
# from confluent_kafka import Producer
# import json
#
# producer = Producer({'bootstrap.servers': 'localhost:9092'})
#
# def submit_document(doc_id, text, metadata):
# """Submit document for embedding generation"""
# message = {
# "doc_id": doc_id,
# "text": text,
# "metadata": metadata,
# "action": "upsert",
# }
# producer.produce(
# topic="embedding-requests",
# key=doc_id.encode(),
# value=json.dumps(message).encode(),
# )
# producer.flush()
#
# # Kafka Consumer (Embedding Worker)
# from confluent_kafka import Consumer
#
# consumer = Consumer({
# 'bootstrap.servers': 'localhost:9092',
# 'group.id': 'embedding-workers',
# 'auto.offset.reset': 'latest',
# })
# consumer.subscribe(['embedding-requests'])
#
# BATCH = []
# BATCH_SIZE = 50
#
# while True:
# msg = consumer.poll(0.5)
# if msg and not msg.error():
# data = json.loads(msg.value())
# BATCH.append(data)
#
# if len(BATCH) >= BATCH_SIZE:
# # Batch embed
# texts = [d['text'] for d in BATCH]
# embeddings = client.embeddings.create(
# model="text-embedding-3-small",
# input=texts,
# )
# # Batch upsert to Pinecone
# vectors = [
# {"id": d['doc_id'],
# "values": emb.embedding,
# "metadata": d['metadata']}
# for d, emb in zip(BATCH, embeddings.data)
# ]
# index.upsert(vectors=vectors)
# BATCH.clear()
@dataclass
class QueueMetrics:
queue: str
pending: int
processing: int
completed: int
failed: int
avg_latency_ms: float
queues = [
QueueMetrics("embedding-requests", 125, 50, 45000, 12, 85.5),
QueueMetrics("search-requests", 8, 10, 120000, 3, 12.3),
QueueMetrics("delete-requests", 0, 2, 5000, 0, 5.1),
QueueMetrics("reindex-requests", 500, 100, 8000, 45, 250.0),
]
print("\n=== Message Queue Dashboard ===")
for q in queues:
total = q.completed + q.failed
error_rate = (q.failed / total * 100) if total > 0 else 0
print(f" [{q.queue}]")
print(f" Pending: {q.pending} | Processing: {q.processing}")
print(f" Completed: {q.completed:,} | Failed: {q.failed} "
f"({error_rate:.2f}%) | Latency: {q.avg_latency_ms}ms")
RAG Pipeline
# === RAG (Retrieval Augmented Generation) ===
# def rag_query(question, top_k=5):
# # 1. Generate query embedding
# query_emb = get_embedding(question)
#
# # 2. Search Pinecone
# results = index.query(
# vector=query_emb,
# top_k=top_k,
# include_metadata=True,
# )
#
# # 3. Build context
# context = "\n\n".join([
# match.metadata['text']
# for match in results.matches
# ])
#
# # 4. Generate answer with LLM
# response = client.chat.completions.create(
# model="gpt-4o-mini",
# messages=[
# {"role": "system", "content": f"Answer based on context:\n{context}"},
# {"role": "user", "content": question},
# ],
# temperature=0.3,
# )
# return response.choices[0].message.content
# Production Architecture
architecture = {
"API Layer": {
"tools": "FastAPI, Load Balancer",
"desc": "รับ Query ส่งไป Queue หรือ Direct Search",
},
"Message Queue": {
"tools": "Kafka, RabbitMQ, Redis Streams",
"desc": "Buffer Requests Async Embedding Generation",
},
"Embedding Service": {
"tools": "OpenAI API, Sentence Transformers",
"desc": "Generate Vector Embeddings Batch Processing",
},
"Vector Database": {
"tools": "Pinecone, Qdrant, Weaviate",
"desc": "Store and Search Vectors ANN Algorithm",
},
"LLM Service": {
"tools": "OpenAI, Anthropic, Local LLM",
"desc": "Generate Answers from Context RAG",
},
"Cache": {
"tools": "Redis, Memcached",
"desc": "Cache Frequent Queries ลด Latency Cost",
},
}
print("RAG Production Architecture:")
for layer, info in architecture.items():
print(f"\n [{layer}]")
print(f" Tools: {info['tools']}")
print(f" {info['desc']}")
# Cost Optimization
tips = [
"Batch Embedding — รวมหลาย Texts ส่ง API ครั้งเดียว",
"Cache Results — Cache Query ที่ซ้ำบ่อย",
"Smaller Model — ใช้ text-embedding-3-small แทน large",
"Namespace — แยก Namespace ลด Search Scope",
"Metadata Filter — Filter ก่อน Vector Search ลด Compute",
"Serverless — ใช้ Pinecone Serverless จ่ายตามใช้งาน",
]
print(f"\n\nCost Optimization:")
for i, tip in enumerate(tips, 1):
print(f" {i}. {tip}")
เคล็ดลับ
- Batch: Batch Embedding และ Upsert ลด API Calls และ Cost
- Cache: Cache Embedding ของ Query ที่ซ้ำบ่อย ลด Latency
- Namespace: ใช้ Namespace แยกข้อมูล ค้นหาเร็วขึ้น
- Hybrid: ใช้ Hybrid Search (Vector + Keyword) ผลลัพธ์ดีกว่า
- Queue: ใช้ Message Queue สำหรับ Heavy Workload ไม่ Block API
Vector Database คืออะไร
ฐานข้อมูลเก็บ Vector Embeddings ANN Search Semantic Search Recommendation RAG Pinecone Weaviate Qdrant Milvus ChromaDB
Pinecone คืออะไร
Managed Vector Database Cloud Serverless Billion-scale Metadata Filtering Namespace Real-time Upsert Query Python SDK Hybrid Search
Message Queue ใช้กับ Vector Database อย่างไร
Async ไม่ Block Batch Processing Embedding Generation Upsert ลด Latency Buffer Spike Traffic Kafka RabbitMQ Redis
RAG คืออะไร
Retrieval Augmented Generation เสริม LLM ด้วย Vector DB Query Context ส่ง LLM คำตอบถูกต้อง Up-to-date
สรุป
Vector Database Pinecone Embeddings ANN Semantic Search Message Queue Kafka Async Batch RAG LLM Retrieval Augmented Generation Production Architecture Cache Cost Optimization Hybrid Search
