Embedding Models ในระบบ Distributed
Embedding Models แปลงข้อมูลเป็น Vector ตัวเลขที่เก็บความหมาย ใช้สำหรับ Semantic Search, Recommendation และ Similarity Matching เมื่อข้อมูลมีหลายล้าน Records ต้องใช้ Distributed System กระจาย Vectors ข้าม Nodes เพื่อ Performance และ Scalability
ระบบ Distributed สำหรับ Embeddings ประกอบด้วย Embedding Service (สร้าง Vectors), Vector Database (เก็บและค้นหา), API Gateway (รับ Requests) และ Monitoring (ดูแล Performance)
Embedding Service
# embedding_service.py — Distributed Embedding Service
# pip install sentence-transformers fastapi uvicorn numpy
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass
import time
import hashlib
import json
class EmbeddingService:
"""Embedding Service รองรับหลาย Models"""
MODELS = {
"mini": "all-MiniLM-L6-v2", # 384 dims, เร็ว
"base": "all-mpnet-base-v2", # 768 dims, สมดุล
"large": "BAAI/bge-large-en-v1.5", # 1024 dims, แม่นยำ
"multi": "paraphrase-multilingual-MiniLM-L12-v2", # 384 dims, หลายภาษา
}
def __init__(self, model_name="base", device="cpu", cache_size=10000):
model_id = self.MODELS.get(model_name, model_name)
self.model = SentenceTransformer(model_id, device=device)
self.model_name = model_name
self.dims = self.model.get_sentence_embedding_dimension()
self.cache = {}
self.cache_size = cache_size
self.stats = {"requests": 0, "cache_hits": 0, "total_texts": 0}
print(f"Model: {model_id} ({self.dims} dims)")
def embed(self, texts: List[str], normalize=True) -> np.ndarray:
"""สร้าง Embeddings"""
self.stats["requests"] += 1
self.stats["total_texts"] += len(texts)
# Check Cache
uncached_texts = []
uncached_indices = []
results = [None] * len(texts)
for i, text in enumerate(texts):
key = self._cache_key(text)
if key in self.cache:
results[i] = self.cache[key]
self.stats["cache_hits"] += 1
else:
uncached_texts.append(text)
uncached_indices.append(i)
# Embed uncached texts
if uncached_texts:
embeddings = self.model.encode(
uncached_texts,
normalize_embeddings=normalize,
batch_size=32,
show_progress_bar=False,
)
for idx, emb in zip(uncached_indices, embeddings):
results[idx] = emb
key = self._cache_key(uncached_texts[uncached_indices.index(idx)])
if len(self.cache) < self.cache_size:
self.cache[key] = emb
return np.array(results)
def similarity(self, text1: str, text2: str) -> float:
"""คำนวณ Cosine Similarity"""
embs = self.embed([text1, text2])
return float(np.dot(embs[0], embs[1]))
def search(self, query: str, documents: List[str], top_k=5):
"""Semantic Search"""
all_texts = [query] + documents
embs = self.embed(all_texts)
query_emb = embs[0]
doc_embs = embs[1:]
scores = np.dot(doc_embs, query_emb)
top_indices = np.argsort(scores)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
"document": documents[idx],
"score": float(scores[idx]),
"rank": len(results) + 1,
})
return results
def _cache_key(self, text):
return hashlib.md5(text.encode()).hexdigest()
def print_stats(self):
hit_rate = (self.stats["cache_hits"] / self.stats["total_texts"] * 100
if self.stats["total_texts"] > 0 else 0)
print(f"\nEmbedding Service Stats:")
print(f" Model: {self.model_name} ({self.dims}d)")
print(f" Requests: {self.stats['requests']}")
print(f" Texts: {self.stats['total_texts']}")
print(f" Cache Hits: {self.stats['cache_hits']} ({hit_rate:.1f}%)")
# ตัวอย่าง
svc = EmbeddingService("mini")
docs = [
"Python programming language tutorial",
"Machine learning with TensorFlow",
"Web development with React",
"Database optimization techniques",
"Cloud computing with AWS",
"DevOps automation pipeline",
"Kubernetes container orchestration",
"Data science and analytics",
]
results = svc.search("how to deploy applications", docs, top_k=3)
print("\nSearch: 'how to deploy applications'")
for r in results:
print(f" [{r['rank']}] {r['score']:.3f} — {r['document']}")
svc.print_stats()
Distributed Vector Storage
# distributed_vectors.py — Distributed Vector Storage
# pip install qdrant-client numpy
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue,
OptimizersConfigDiff, HnswConfigDiff,
)
import numpy as np
from typing import List, Dict, Optional
import uuid
import time
class DistributedVectorStore:
"""Distributed Vector Storage ด้วย Qdrant"""
def __init__(self, hosts=None, collection="embeddings", dims=384):
if hosts is None:
hosts = ["localhost:6333"]
# เชื่อมต่อ Qdrant Cluster
self.client = QdrantClient(host=hosts[0].split(":")[0],
port=int(hosts[0].split(":")[1]))
self.collection = collection
self.dims = dims
self._ensure_collection()
def _ensure_collection(self):
"""สร้าง Collection ถ้ายังไม่มี"""
collections = [c.name for c in self.client.get_collections().collections]
if self.collection not in collections:
self.client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(
size=self.dims,
distance=Distance.COSINE,
),
optimizers_config=OptimizersConfigDiff(
indexing_threshold=20000,
),
hnsw_config=HnswConfigDiff(
m=16,
ef_construct=100,
),
shard_number=3, # Distribute across 3 shards
replication_factor=2, # 2 replicas for HA
)
def upsert(self, ids, vectors, payloads=None):
"""เพิ่ม/อัพเดท Vectors"""
points = []
for i, (vid, vec) in enumerate(zip(ids, vectors)):
payload = payloads[i] if payloads else {}
points.append(PointStruct(
id=vid, vector=vec.tolist(), payload=payload,
))
self.client.upsert(
collection_name=self.collection,
points=points,
)
def search(self, query_vector, top_k=10, filters=None):
"""ค้นหา Nearest Neighbors"""
query_filter = None
if filters:
conditions = []
for key, value in filters.items():
conditions.append(FieldCondition(
key=key, match=MatchValue(value=value),
))
query_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection,
query_vector=query_vector.tolist(),
limit=top_k,
query_filter=query_filter,
)
return [{"id": r.id, "score": r.score, "payload": r.payload}
for r in results]
def info(self):
"""แสดงข้อมูล Collection"""
info = self.client.get_collection(self.collection)
print(f"\nCollection: {self.collection}")
print(f" Points: {info.points_count}")
print(f" Vectors: {info.vectors_count}")
print(f" Dimensions: {self.dims}")
print(f" Status: {info.status}")
# === Qdrant Cluster Setup (Docker Compose) ===
# version: '3.8'
# services:
# qdrant-node1:
# image: qdrant/qdrant:latest
# ports:
# - "6333:6333"
# environment:
# - QDRANT__CLUSTER__ENABLED=true
# - QDRANT__CLUSTER__P2P__PORT=6335
# volumes:
# - qdrant1-data:/qdrant/storage
#
# qdrant-node2:
# image: qdrant/qdrant:latest
# environment:
# - QDRANT__CLUSTER__ENABLED=true
# - QDRANT__CLUSTER__P2P__PORT=6335
# - QDRANT__CLUSTER__BOOTSTRAP=http://qdrant-node1:6335
# volumes:
# - qdrant2-data:/qdrant/storage
#
# qdrant-node3:
# image: qdrant/qdrant:latest
# environment:
# - QDRANT__CLUSTER__ENABLED=true
# - QDRANT__CLUSTER__P2P__PORT=6335
# - QDRANT__CLUSTER__BOOTSTRAP=http://qdrant-node1:6335
# volumes:
# - qdrant3-data:/qdrant/storage
#
# volumes:
# qdrant1-data:
# qdrant2-data:
# qdrant3-data:
print("Distributed Vector Store configured")
print(" Shards: 3 | Replication: 2")
Scaling Strategy
# scaling_vectors.py — Vector Database Scaling Strategy
import math
def calculate_resources(num_vectors, dims, precision="float32"):
"""คำนวณ Resources ที่ต้องการ"""
bytes_per_dim = {"float32": 4, "float16": 2, "int8": 1}
bpd = bytes_per_dim.get(precision, 4)
# Vector Storage
vector_bytes = num_vectors * dims * bpd
vector_gb = vector_bytes / (1024**3)
# HNSW Index (ประมาณ 1.5x vector size)
index_gb = vector_gb * 1.5
# Payload Storage (ประมาณ 1KB per vector)
payload_gb = num_vectors * 1024 / (1024**3)
total_gb = vector_gb + index_gb + payload_gb
# RAM Requirement (Index + Hot Data)
ram_gb = index_gb + (vector_gb * 0.2) # 20% hot data
# Shard Calculation (max 10M vectors per shard)
shards = max(1, math.ceil(num_vectors / 10_000_000))
return {
"vectors": f"{num_vectors:,}",
"dimensions": dims,
"precision": precision,
"vector_storage_gb": round(vector_gb, 1),
"index_gb": round(index_gb, 1),
"payload_gb": round(payload_gb, 1),
"total_storage_gb": round(total_gb, 1),
"ram_required_gb": round(ram_gb, 1),
"recommended_shards": shards,
"recommended_replicas": min(3, shards),
}
# === ตัวอย่างขนาดต่างๆ ===
scenarios = [
(100_000, 384, "float32", "Small — Startup"),
(1_000_000, 768, "float32", "Medium — Growing Business"),
(10_000_000, 768, "float16", "Large — Enterprise"),
(100_000_000, 384, "int8", "Massive — Big Tech"),
]
print("Vector Database Sizing Guide")
print("=" * 60)
for num, dims, prec, label in scenarios:
res = calculate_resources(num, dims, prec)
print(f"\n {label}")
print(f" Vectors: {res['vectors']} x {dims}d ({prec})")
print(f" Storage: {res['total_storage_gb']} GB")
print(f" RAM: {res['ram_required_gb']} GB")
print(f" Shards: {res['recommended_shards']} | "
f"Replicas: {res['recommended_replicas']}")
# Scaling Tips
print(f"\nScaling Tips:")
print(f" - ใช้ float16 ลด Storage 50% Accuracy ลดน้อยมาก")
print(f" - ใช้ Product Quantization ลด Memory อีก 4-8x")
print(f" - Shard ตาม Region ลด Latency")
print(f" - Separate Read/Write Replicas")
Best Practices
- Model Selection: เลือก Model ที่เหมาะ mini (384d เร็ว) base (768d สมดุล) large (1024d แม่นยำ)
- Quantization: ใช้ float16 หรือ int8 ลด Storage และ RAM โดย Accuracy ลดน้อยมาก
- Sharding: Shard ข้อมูลข้าม Nodes สำหรับ Parallel Search ลด Latency
- Replication: Replicate อย่างน้อย 2x สำหรับ High Availability
- Caching: Cache Embeddings ของ Frequent Queries ลด Computation
- Batch Processing: ส่ง Texts เป็น Batch (32-64) สร้าง Embeddings ได้เร็วกว่าทีละอัน
Embedding Model คืออะไร
ML Model แปลงข้อมูล (Text Image Audio) เป็น Vector ตัวเลขเก็บความหมาย ข้อมูลคล้ายกัน Vector ใกล้กัน ใช้ Semantic Search Recommendation Clustering ตัวอย่าง OpenAI Embeddings Sentence-BERT CLIP
Distributed System คืออะไร
ระบบหลาย Nodes ทำงานร่วมกัน กระจาย Workload ข้าม Machines เพื่อ Scalability Availability Fault Tolerance เช่น Kubernetes Kafka Cassandra Elasticsearch ใช้ Consensus Protocols
ทำไมต้องใช้ Distributed System กับ Embeddings
Vectors ขนาดใหญ่ (768-4096 dims) หลายล้าน Records ใช้ RAM Storage มาก ไม่พอใน Server เดียว ต้องใช้ Distributed Vector Database เช่น Milvus Qdrant Weaviate Shard ข้าม Nodes
Vector Database เลือกอันไหนดี
Milvus Enterprise Scale Billions of Vectors, Qdrant Production ใช้ง่าย API สวย, Weaviate Multi-modal, Pinecone Managed Service, pgvector ทีมที่ใช้ PostgreSQL อยู่แล้ว
สรุป
Embedding Models ในระบบ Distributed ต้องมี Embedding Service สร้าง Vectors, Distributed Vector Database เก็บและค้นหา เลือก Model ที่เหมาะกับงาน ใช้ Quantization ลด Resources Shard ข้าม Nodes Replicate สำหรับ HA Cache Frequent Queries Batch Processing สำหรับ Performance
