SiamCafe.net Blog
Technology

Vector Database Pinecone Message Queue Design

vector database pinecone message queue design
Vector Database Pinecone Message Queue Design | SiamCafe Blog
2026-03-04· อ. บอม — SiamCafe.net· 8,224 คำ

Vector Database Message Queue

Vector Database Pinecone Embeddings ANN Search Semantic Search RAG Message Queue Async Batch Processing Kafka RabbitMQ Production Architecture

Vector DBTypeScaleจุดเด่น
PineconeManaged CloudBillionsServerless ง่าย Fast
WeaviateOpen SourceMillionsGraphQL Modules
QdrantOpen SourceMillionsRust Performance
MilvusOpen SourceBillionsGPU Distributed
ChromaDBOpen SourceThousandsSimple Embedded

Pinecone Setup และ Embedding

# === Pinecone Vector Database ===

# pip install pinecone-client openai

# from pinecone import Pinecone, ServerlessSpec
# import openai
#
# # Initialize Pinecone
# pc = Pinecone(api_key="YOUR_API_KEY")
#
# # Create Index
# pc.create_index(
#     name="knowledge-base",
#     dimension=1536,  # OpenAI text-embedding-3-small
#     metric="cosine",
#     spec=ServerlessSpec(cloud="aws", region="us-east-1"),
# )
#
# index = pc.Index("knowledge-base")
#
# # Generate Embeddings
# client = openai.OpenAI()
#
# def get_embedding(text):
#     response = client.embeddings.create(
#         model="text-embedding-3-small",
#         input=text,
#     )
#     return response.data[0].embedding
#
# # Upsert Vectors
# documents = [
#     {"id": "doc1", "text": "Python programming guide", "category": "tech"},
#     {"id": "doc2", "text": "Machine learning basics", "category": "ml"},
#     {"id": "doc3", "text": "Docker containerization", "category": "devops"},
# ]
#
# vectors = []
# for doc in documents:
#     embedding = get_embedding(doc["text"])
#     vectors.append({
#         "id": doc["id"],
#         "values": embedding,
#         "metadata": {"text": doc["text"], "category": doc["category"]},
#     })
#
# index.upsert(vectors=vectors, namespace="articles")
#
# # Query
# query_embedding = get_embedding("How to learn Python")
# results = index.query(
#     vector=query_embedding,
#     top_k=5,
#     include_metadata=True,
#     namespace="articles",
#     filter={"category": {"$eq": "tech"}},
# )
#
# for match in results.matches:
#     print(f"  Score: {match.score:.3f} | {match.metadata['text']}")

from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class VectorSearchResult:
    doc_id: str
    score: float
    text: str
    category: str

results_demo = [
    VectorSearchResult("doc1", 0.95, "Python programming guide", "tech"),
    VectorSearchResult("doc5", 0.89, "Python data analysis tutorial", "tech"),
    VectorSearchResult("doc12", 0.85, "Introduction to coding", "tech"),
    VectorSearchResult("doc8", 0.78, "Web development with Flask", "tech"),
    VectorSearchResult("doc3", 0.72, "Docker for Python developers", "devops"),
]

print("=== Vector Search: 'How to learn Python' ===")
for r in results_demo:
    print(f"  [{r.score:.2f}] {r.doc_id} | {r.text} ({r.category})")

Message Queue Architecture

# === Message Queue + Vector DB Architecture ===

# Kafka Producer (API Server)
# from confluent_kafka import Producer
# import json
#
# producer = Producer({'bootstrap.servers': 'localhost:9092'})
#
# def submit_document(doc_id, text, metadata):
#     """Submit document for embedding generation"""
#     message = {
#         "doc_id": doc_id,
#         "text": text,
#         "metadata": metadata,
#         "action": "upsert",
#     }
#     producer.produce(
#         topic="embedding-requests",
#         key=doc_id.encode(),
#         value=json.dumps(message).encode(),
#     )
#     producer.flush()
#
# # Kafka Consumer (Embedding Worker)
# from confluent_kafka import Consumer
#
# consumer = Consumer({
#     'bootstrap.servers': 'localhost:9092',
#     'group.id': 'embedding-workers',
#     'auto.offset.reset': 'latest',
# })
# consumer.subscribe(['embedding-requests'])
#
# BATCH = []
# BATCH_SIZE = 50
#
# while True:
#     msg = consumer.poll(0.5)
#     if msg and not msg.error():
#         data = json.loads(msg.value())
#         BATCH.append(data)
#
#     if len(BATCH) >= BATCH_SIZE:
#         # Batch embed
#         texts = [d['text'] for d in BATCH]
#         embeddings = client.embeddings.create(
#             model="text-embedding-3-small",
#             input=texts,
#         )
#         # Batch upsert to Pinecone
#         vectors = [
#             {"id": d['doc_id'],
#              "values": emb.embedding,
#              "metadata": d['metadata']}
#             for d, emb in zip(BATCH, embeddings.data)
#         ]
#         index.upsert(vectors=vectors)
#         BATCH.clear()

@dataclass
class QueueMetrics:
    queue: str
    pending: int
    processing: int
    completed: int
    failed: int
    avg_latency_ms: float

queues = [
    QueueMetrics("embedding-requests", 125, 50, 45000, 12, 85.5),
    QueueMetrics("search-requests", 8, 10, 120000, 3, 12.3),
    QueueMetrics("delete-requests", 0, 2, 5000, 0, 5.1),
    QueueMetrics("reindex-requests", 500, 100, 8000, 45, 250.0),
]

print("\n=== Message Queue Dashboard ===")
for q in queues:
    total = q.completed + q.failed
    error_rate = (q.failed / total * 100) if total > 0 else 0
    print(f"  [{q.queue}]")
    print(f"    Pending: {q.pending} | Processing: {q.processing}")
    print(f"    Completed: {q.completed:,} | Failed: {q.failed} "
          f"({error_rate:.2f}%) | Latency: {q.avg_latency_ms}ms")

RAG Pipeline

# === RAG (Retrieval Augmented Generation) ===

# def rag_query(question, top_k=5):
#     # 1. Generate query embedding
#     query_emb = get_embedding(question)
#
#     # 2. Search Pinecone
#     results = index.query(
#         vector=query_emb,
#         top_k=top_k,
#         include_metadata=True,
#     )
#
#     # 3. Build context
#     context = "\n\n".join([
#         match.metadata['text']
#         for match in results.matches
#     ])
#
#     # 4. Generate answer with LLM
#     response = client.chat.completions.create(
#         model="gpt-4o-mini",
#         messages=[
#             {"role": "system", "content": f"Answer based on context:\n{context}"},
#             {"role": "user", "content": question},
#         ],
#         temperature=0.3,
#     )
#     return response.choices[0].message.content

# Production Architecture
architecture = {
    "API Layer": {
        "tools": "FastAPI, Load Balancer",
        "desc": "รับ Query ส่งไป Queue หรือ Direct Search",
    },
    "Message Queue": {
        "tools": "Kafka, RabbitMQ, Redis Streams",
        "desc": "Buffer Requests Async Embedding Generation",
    },
    "Embedding Service": {
        "tools": "OpenAI API, Sentence Transformers",
        "desc": "Generate Vector Embeddings Batch Processing",
    },
    "Vector Database": {
        "tools": "Pinecone, Qdrant, Weaviate",
        "desc": "Store and Search Vectors ANN Algorithm",
    },
    "LLM Service": {
        "tools": "OpenAI, Anthropic, Local LLM",
        "desc": "Generate Answers from Context RAG",
    },
    "Cache": {
        "tools": "Redis, Memcached",
        "desc": "Cache Frequent Queries ลด Latency Cost",
    },
}

print("RAG Production Architecture:")
for layer, info in architecture.items():
    print(f"\n  [{layer}]")
    print(f"    Tools: {info['tools']}")
    print(f"    {info['desc']}")

# Cost Optimization
tips = [
    "Batch Embedding — รวมหลาย Texts ส่ง API ครั้งเดียว",
    "Cache Results — Cache Query ที่ซ้ำบ่อย",
    "Smaller Model — ใช้ text-embedding-3-small แทน large",
    "Namespace — แยก Namespace ลด Search Scope",
    "Metadata Filter — Filter ก่อน Vector Search ลด Compute",
    "Serverless — ใช้ Pinecone Serverless จ่ายตามใช้งาน",
]

print(f"\n\nCost Optimization:")
for i, tip in enumerate(tips, 1):
    print(f"  {i}. {tip}")

เคล็ดลับ

Vector Database คืออะไร

ฐานข้อมูลเก็บ Vector Embeddings ANN Search Semantic Search Recommendation RAG Pinecone Weaviate Qdrant Milvus ChromaDB

Pinecone คืออะไร

Managed Vector Database Cloud Serverless Billion-scale Metadata Filtering Namespace Real-time Upsert Query Python SDK Hybrid Search

Message Queue ใช้กับ Vector Database อย่างไร

Async ไม่ Block Batch Processing Embedding Generation Upsert ลด Latency Buffer Spike Traffic Kafka RabbitMQ Redis

RAG คืออะไร

Retrieval Augmented Generation เสริม LLM ด้วย Vector DB Query Context ส่ง LLM คำตอบถูกต้อง Up-to-date

สรุป

Vector Database Pinecone Embeddings ANN Semantic Search Message Queue Kafka Async Batch RAG LLM Retrieval Augmented Generation Production Architecture Cache Cost Optimization Hybrid Search

📖 บทความที่เกี่ยวข้อง

Vector Database Pinecone Micro-segmentationอ่านบทความ → Vector Database Pinecone Tech Conference 2026อ่านบทความ → Vector Database Pinecone Developer Experience DXอ่านบทความ → ZFS on Linux Message Queue Designอ่านบทความ → Vector Database Pinecone Cloud Migration Strategyอ่านบทความ →

📚 ดูบทความทั้งหมด →