Vector Database Pinecone Message Queue Design —
Vector Database Message Queue

Vector Database Pinecone Embeddings ANN Search Semantic Search RAG Message Queue Async Batch Processing Kafka RabbitMQ Production Architecture
| Vector DB | Type | Scale | จุดเด่น |
|---|---|---|---|
| Pinecone | Managed Cloud | Billions | Serverless ง่าย Fast |
| Weaviate | Open Source | Millions | GraphQL Modules |
| Qdrant | Open Source | Millions | Rust Performance |
| Milvus | Open Source | Billions | GPU Distributed |
| ChromaDB | Open Source | Thousands | Simple Embedded |
Pinecone Setup และ Embedding
=== Pinecone Vector Database ===
pip install pinecone-client openai
from pinecone import Pinecone, ServerlessSpec
import openai
# Initialize Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
# Create Index
pc.create_index(
name="knowledge-base",
dimension=1536, # OpenAI text-embedding-3-small
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
index = pc.Index("knowledge-base")
# Generate Embeddings
client = openai.OpenAI()
def get_embedding(text):
response = client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
# Upsert Vectors
documents = [
{"id": "doc1", "text": "Python programming guide", "category": "tech"},
{"id": "doc2", "text": "Machine learning basics", "category": "ml"},
{"id": "doc3", "text": "Docker containerization", "category": "devops"},
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Genesis Block — คู่มือฉบับสมบูรณ์ 2026
]
vectors = []
for doc in documents:
embedding = get_embedding(doc["text"])
vectors.append({
"id": doc["id"],
"values": embedding,
"metadata": {"text": doc["text"], "category": doc["category"]},
แนะนำเพิ่มเติม — อีบุ๊กการลงทุน SiamCafeBook
})
index.upsert(vectors=vectors, namespace="articles")
# Query
query_embedding = get_embedding("How to learn Python")
results = index.query(
vector=query_embedding,
top_k=5,
include_metadata=True,
namespace="articles",
filter={"category": {"$eq": "tech"}},
)
for match in results.matches:
print(f" Score: {match.score:.3f} | {match.metadata['text']}")
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class VectorSearchResult:
doc_id: str
score: float
text: str
category: str
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ ตั้งค่า CDN สำหรับ Elixir Nerves IoT อย่างมืออาชีพ
results_demo = [
VectorSearchResult("doc1", 0.95, "Python programming guide", "tech"),
VectorSearchResult("doc5", 0.89, "Python data analysis tutorial", "tech"),
VectorSearchResult("doc12", 0.85, "Introduction to coding", "tech"),
VectorSearchResult("doc8", 0.78, "Web development with Flask", "tech"),
VectorSearchResult("doc3", 0.72, "Docker for Python developers", "devops"),
]
print("=== Vector Search: 'How to learn Python' ===")
for r in results_demo:
print(f" [{r.score:.2f}] {r.doc_id} | {r.text} ({r.category})")
Message Queue Architecture
=== Message Queue + Vector DB Architecture ===
Kafka Producer (API Server)
from confluent_kafka import Producer
import json
producer = Producer({'bootstrap.servers': 'localhost:9092'})
แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex
def submit_document(doc_id, text, metadata):

"""Submit document for embedding generation"""
message = {
"doc_id": doc_id,
"text": text,
"metadata": metadata,
"action": "upsert",
}
producer.produce(
topic="embedding-requests",
key=doc_id.encode(),
value=json.dumps(message).encode(),
)
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ เหรียญ shiba มีโอกาสถึง 1 บาทไหม
producer.flush()
# Kafka Consumer (Embedding Worker)
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'embedding-workers',
'auto.offset.reset': 'latest',
})
consumer.subscribe(['embedding-requests'])
BATCH = []
BATCH_SIZE = 50
while True:
msg = consumer.poll(0.5)
if msg and not msg.error():
data = json.loads(msg.value())
BATCH.append(data)
if len(BATCH) >= BATCH_SIZE:
# Batch embed
texts = [d['text'] for d in BATCH]
embeddings = client.embeddings.create(
model="text-embedding-3-small",
input=texts,
)
# Batch upsert to Pinecone
vectors = [
{"id": d['doc_id'],
"values": emb.embedding,
"metadata": d['metadata']}
for d, emb in zip(BATCH, embeddings.data)
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน machine learning คือ pdf
]
index.upsert(vectors=vectors)
BATCH.clear()
@dataclass
class QueueMetrics:
queue: str
pending: int
processing: int
completed: int
failed: int
avg_latency_ms: float
queues = [
QueueMetrics("embedding-requests", 125, 50, 45000, 12, 85.5),
QueueMetrics("search-requests", 8, 10, 120000, 3, 12.3),
QueueMetrics("delete-requests", 0, 2, 5000, 0, 5.1),
QueueMetrics("reindex-requests", 500, 100, 8000, 45, 250.0),
]
print("\n=== Message Queue Dashboard ===")
for q in queues:
total = q.completed + q.failed
error_rate = (q.failed / total * 100) if total > 0 else 0
print(f" [{q.queue}]")
print(f" Pending: {q.pending} | Processing: {q.processing}")
print(f" Completed: {q.completed:,} | Failed: {q.failed} "
f"({error_rate:.2f}%) | Latency: {q.avg_latency_ms}ms")
RAG Pipeline
# === RAG (Retrieval Augmented Generation) ===
# def rag_query(question, top_k=5):
# # 1. Generate query embedding
# query_emb = get_embedding(question)
#
# # 2. Search Pinecone
# results = index.query(
# vector=query_emb,
# top_k=top_k,
# include_metadata=True,
# )
#
# # 3. Build context
# context = "\n\n".join([
# match.metadata['text']
# for match in results.matches
# ])
#
# # 4. Generate answer with LLM
# response = client.chat.completions.create(
# model="gpt-4o-mini",
# messages=[
# {"role": "system", "content": f"Answer based on context:\n{context}"},
# {"role": "user", "content": question},
# ],
# temperature=0.3,
# )
# return response.choices[0].message.content
# Production Architecture
architecture = {
"API Layer": {
"tools": "FastAPI, Load Balancer",
"desc": "รับ Query ส่งไป Queue หรือ Direct Search",
},
"Message Queue": {
"tools": "Kafka, RabbitMQ, Redis Streams",
"desc": "Buffer Requests Async Embedding Generation",
},
"Embedding Service": {
"tools": "OpenAI API, Sentence Transformers",
"desc": "Generate Vector Embeddings Batch Processing",
},
"Vector Database": {
"tools": "Pinecone, Qdrant, Weaviate",
"desc": "Store and Search Vectors ANN Algorithm",
},
"LLM Service": {
"tools": "OpenAI, Anthropic, Local LLM",
"desc": "Generate Answers from Context RAG",
},
"Cache": {
"tools": "Redis, Memcached",
"desc": "Cache Frequent Queries ลด Latency Cost",
},
}
print("RAG Production Architecture:")
for layer, info in architecture.items():
print(f"\n [{layer}]")
print(f" Tools: {info['tools']}")
print(f" {info['desc']}")
# Cost Optimization
tips = [
"Batch Embedding — รวมหลาย Texts ส่ง API ครั้งเดียว",
"Cache Results — Cache Query ที่ซ้ำบ่อย",
"Smaller Model — ใช้ text-embedding-3-small แทน large",
"Namespace — แยก Namespace ลด Search Scope",
"Metadata Filter — Filter ก่อน Vector Search ลด Compute",
"Serverless — ใช้ Pinecone Serverless จ่ายตามใช้งาน",
]
print(f"\n\nCost Optimization:")
for i, tip in enumerate(tips, 1):
print(f" {i}. {tip}")
เคล็ดลับ
- Batch: Batch Embedding และ Upsert ลด API Calls และ Cost
- Cache: Cache Embedding ของ Query ที่ซ้ำบ่อย ลด Latency
- Namespace: ใช้ Namespace แยกข้อมูล ค้นหาเร็วขึ้น
- Hybrid: ใช้ Hybrid Search (Vector + Keyword) ผลลัพธ์ดีกว่า
- Queue: ใช้ Message Queue สำหรับ Heavy Workload ไม่ Block API
Vector Database คืออะไร
ฐานข้อมูลเก็บ Vector Embeddings ANN Search Semantic Search Recommendation RAG Pinecone Weaviate Qdrant Milvus ChromaDB




