Technology

Embedding Model Distributed System

embedding model distributed system
Embedding Model Distributed System | SiamCafe Blog
2026-01-04· อ. บอม — SiamCafe.net· 9,351 คำ

Embedding Models ในระบบ Distributed

Embedding Models แปลงข้อมูลเป็น Vector ตัวเลขที่เก็บความหมาย ใช้สำหรับ Semantic Search, Recommendation และ Similarity Matching เมื่อข้อมูลมีหลายล้าน Records ต้องใช้ Distributed System กระจาย Vectors ข้าม Nodes เพื่อ Performance และ Scalability

ระบบ Distributed สำหรับ Embeddings ประกอบด้วย Embedding Service (สร้าง Vectors), Vector Database (เก็บและค้นหา), API Gateway (รับ Requests) และ Monitoring (ดูแล Performance)

Embedding Service

# embedding_service.py — Distributed Embedding Service
# pip install sentence-transformers fastapi uvicorn numpy

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass
import time
import hashlib
import json

class EmbeddingService:
    """Embedding Service รองรับหลาย Models"""

    MODELS = {
        "mini": "all-MiniLM-L6-v2",          # 384 dims, เร็ว
        "base": "all-mpnet-base-v2",          # 768 dims, สมดุล
        "large": "BAAI/bge-large-en-v1.5",   # 1024 dims, แม่นยำ
        "multi": "paraphrase-multilingual-MiniLM-L12-v2",  # 384 dims, หลายภาษา
    }

    def __init__(self, model_name="base", device="cpu", cache_size=10000):
        model_id = self.MODELS.get(model_name, model_name)
        self.model = SentenceTransformer(model_id, device=device)
        self.model_name = model_name
        self.dims = self.model.get_sentence_embedding_dimension()
        self.cache = {}
        self.cache_size = cache_size
        self.stats = {"requests": 0, "cache_hits": 0, "total_texts": 0}

        print(f"Model: {model_id} ({self.dims} dims)")

    def embed(self, texts: List[str], normalize=True) -> np.ndarray:
        """สร้าง Embeddings"""
        self.stats["requests"] += 1
        self.stats["total_texts"] += len(texts)

        # Check Cache
        uncached_texts = []
        uncached_indices = []
        results = [None] * len(texts)

        for i, text in enumerate(texts):
            key = self._cache_key(text)
            if key in self.cache:
                results[i] = self.cache[key]
                self.stats["cache_hits"] += 1
            else:
                uncached_texts.append(text)
                uncached_indices.append(i)

        # Embed uncached texts
        if uncached_texts:
            embeddings = self.model.encode(
                uncached_texts,
                normalize_embeddings=normalize,
                batch_size=32,
                show_progress_bar=False,
            )

            for idx, emb in zip(uncached_indices, embeddings):
                results[idx] = emb
                key = self._cache_key(uncached_texts[uncached_indices.index(idx)])
                if len(self.cache) < self.cache_size:
                    self.cache[key] = emb

        return np.array(results)

    def similarity(self, text1: str, text2: str) -> float:
        """คำนวณ Cosine Similarity"""
        embs = self.embed([text1, text2])
        return float(np.dot(embs[0], embs[1]))

    def search(self, query: str, documents: List[str], top_k=5):
        """Semantic Search"""
        all_texts = [query] + documents
        embs = self.embed(all_texts)
        query_emb = embs[0]
        doc_embs = embs[1:]

        scores = np.dot(doc_embs, query_emb)
        top_indices = np.argsort(scores)[::-1][:top_k]

        results = []
        for idx in top_indices:
            results.append({
                "document": documents[idx],
                "score": float(scores[idx]),
                "rank": len(results) + 1,
            })
        return results

    def _cache_key(self, text):
        return hashlib.md5(text.encode()).hexdigest()

    def print_stats(self):
        hit_rate = (self.stats["cache_hits"] / self.stats["total_texts"] * 100
                    if self.stats["total_texts"] > 0 else 0)
        print(f"\nEmbedding Service Stats:")
        print(f"  Model: {self.model_name} ({self.dims}d)")
        print(f"  Requests: {self.stats['requests']}")
        print(f"  Texts: {self.stats['total_texts']}")
        print(f"  Cache Hits: {self.stats['cache_hits']} ({hit_rate:.1f}%)")

# ตัวอย่าง
svc = EmbeddingService("mini")

docs = [
    "Python programming language tutorial",
    "Machine learning with TensorFlow",
    "Web development with React",
    "Database optimization techniques",
    "Cloud computing with AWS",
    "DevOps automation pipeline",
    "Kubernetes container orchestration",
    "Data science and analytics",
]

results = svc.search("how to deploy applications", docs, top_k=3)
print("\nSearch: 'how to deploy applications'")
for r in results:
    print(f"  [{r['rank']}] {r['score']:.3f} — {r['document']}")

svc.print_stats()

Distributed Vector Storage

# distributed_vectors.py — Distributed Vector Storage
# pip install qdrant-client numpy

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue,
    OptimizersConfigDiff, HnswConfigDiff,
)
import numpy as np
from typing import List, Dict, Optional
import uuid
import time

class DistributedVectorStore:
    """Distributed Vector Storage ด้วย Qdrant"""

    def __init__(self, hosts=None, collection="embeddings", dims=384):
        if hosts is None:
            hosts = ["localhost:6333"]

        # เชื่อมต่อ Qdrant Cluster
        self.client = QdrantClient(host=hosts[0].split(":")[0],
                                    port=int(hosts[0].split(":")[1]))
        self.collection = collection
        self.dims = dims
        self._ensure_collection()

    def _ensure_collection(self):
        """สร้าง Collection ถ้ายังไม่มี"""
        collections = [c.name for c in self.client.get_collections().collections]
        if self.collection not in collections:
            self.client.create_collection(
                collection_name=self.collection,
                vectors_config=VectorParams(
                    size=self.dims,
                    distance=Distance.COSINE,
                ),
                optimizers_config=OptimizersConfigDiff(
                    indexing_threshold=20000,
                ),
                hnsw_config=HnswConfigDiff(
                    m=16,
                    ef_construct=100,
                ),
                shard_number=3,  # Distribute across 3 shards
                replication_factor=2,  # 2 replicas for HA
            )

    def upsert(self, ids, vectors, payloads=None):
        """เพิ่ม/อัพเดท Vectors"""
        points = []
        for i, (vid, vec) in enumerate(zip(ids, vectors)):
            payload = payloads[i] if payloads else {}
            points.append(PointStruct(
                id=vid, vector=vec.tolist(), payload=payload,
            ))

        self.client.upsert(
            collection_name=self.collection,
            points=points,
        )

    def search(self, query_vector, top_k=10, filters=None):
        """ค้นหา Nearest Neighbors"""
        query_filter = None
        if filters:
            conditions = []
            for key, value in filters.items():
                conditions.append(FieldCondition(
                    key=key, match=MatchValue(value=value),
                ))
            query_filter = Filter(must=conditions)

        results = self.client.search(
            collection_name=self.collection,
            query_vector=query_vector.tolist(),
            limit=top_k,
            query_filter=query_filter,
        )

        return [{"id": r.id, "score": r.score, "payload": r.payload}
                for r in results]

    def info(self):
        """แสดงข้อมูล Collection"""
        info = self.client.get_collection(self.collection)
        print(f"\nCollection: {self.collection}")
        print(f"  Points: {info.points_count}")
        print(f"  Vectors: {info.vectors_count}")
        print(f"  Dimensions: {self.dims}")
        print(f"  Status: {info.status}")

# === Qdrant Cluster Setup (Docker Compose) ===
# version: '3.8'
# services:
#   qdrant-node1:
#     image: qdrant/qdrant:latest
#     ports:
#       - "6333:6333"
#     environment:
#       - QDRANT__CLUSTER__ENABLED=true
#       - QDRANT__CLUSTER__P2P__PORT=6335
#     volumes:
#       - qdrant1-data:/qdrant/storage
#
#   qdrant-node2:
#     image: qdrant/qdrant:latest
#     environment:
#       - QDRANT__CLUSTER__ENABLED=true
#       - QDRANT__CLUSTER__P2P__PORT=6335
#       - QDRANT__CLUSTER__BOOTSTRAP=http://qdrant-node1:6335
#     volumes:
#       - qdrant2-data:/qdrant/storage
#
#   qdrant-node3:
#     image: qdrant/qdrant:latest
#     environment:
#       - QDRANT__CLUSTER__ENABLED=true
#       - QDRANT__CLUSTER__P2P__PORT=6335
#       - QDRANT__CLUSTER__BOOTSTRAP=http://qdrant-node1:6335
#     volumes:
#       - qdrant3-data:/qdrant/storage
#
# volumes:
#   qdrant1-data:
#   qdrant2-data:
#   qdrant3-data:

print("Distributed Vector Store configured")
print("  Shards: 3 | Replication: 2")

Scaling Strategy

# scaling_vectors.py — Vector Database Scaling Strategy

import math

def calculate_resources(num_vectors, dims, precision="float32"):
    """คำนวณ Resources ที่ต้องการ"""
    bytes_per_dim = {"float32": 4, "float16": 2, "int8": 1}
    bpd = bytes_per_dim.get(precision, 4)

    # Vector Storage
    vector_bytes = num_vectors * dims * bpd
    vector_gb = vector_bytes / (1024**3)

    # HNSW Index (ประมาณ 1.5x vector size)
    index_gb = vector_gb * 1.5

    # Payload Storage (ประมาณ 1KB per vector)
    payload_gb = num_vectors * 1024 / (1024**3)

    total_gb = vector_gb + index_gb + payload_gb

    # RAM Requirement (Index + Hot Data)
    ram_gb = index_gb + (vector_gb * 0.2)  # 20% hot data

    # Shard Calculation (max 10M vectors per shard)
    shards = max(1, math.ceil(num_vectors / 10_000_000))

    return {
        "vectors": f"{num_vectors:,}",
        "dimensions": dims,
        "precision": precision,
        "vector_storage_gb": round(vector_gb, 1),
        "index_gb": round(index_gb, 1),
        "payload_gb": round(payload_gb, 1),
        "total_storage_gb": round(total_gb, 1),
        "ram_required_gb": round(ram_gb, 1),
        "recommended_shards": shards,
        "recommended_replicas": min(3, shards),
    }

# === ตัวอย่างขนาดต่างๆ ===
scenarios = [
    (100_000, 384, "float32", "Small — Startup"),
    (1_000_000, 768, "float32", "Medium — Growing Business"),
    (10_000_000, 768, "float16", "Large — Enterprise"),
    (100_000_000, 384, "int8", "Massive — Big Tech"),
]

print("Vector Database Sizing Guide")
print("=" * 60)

for num, dims, prec, label in scenarios:
    res = calculate_resources(num, dims, prec)
    print(f"\n  {label}")
    print(f"    Vectors: {res['vectors']} x {dims}d ({prec})")
    print(f"    Storage: {res['total_storage_gb']} GB")
    print(f"    RAM: {res['ram_required_gb']} GB")
    print(f"    Shards: {res['recommended_shards']} | "
          f"Replicas: {res['recommended_replicas']}")

# Scaling Tips
print(f"\nScaling Tips:")
print(f"  - ใช้ float16 ลด Storage 50% Accuracy ลดน้อยมาก")
print(f"  - ใช้ Product Quantization ลด Memory อีก 4-8x")
print(f"  - Shard ตาม Region ลด Latency")
print(f"  - Separate Read/Write Replicas")

Best Practices

Embedding Model คืออะไร

ML Model แปลงข้อมูล (Text Image Audio) เป็น Vector ตัวเลขเก็บความหมาย ข้อมูลคล้ายกัน Vector ใกล้กัน ใช้ Semantic Search Recommendation Clustering ตัวอย่าง OpenAI Embeddings Sentence-BERT CLIP

Distributed System คืออะไร

ระบบหลาย Nodes ทำงานร่วมกัน กระจาย Workload ข้าม Machines เพื่อ Scalability Availability Fault Tolerance เช่น Kubernetes Kafka Cassandra Elasticsearch ใช้ Consensus Protocols

ทำไมต้องใช้ Distributed System กับ Embeddings

Vectors ขนาดใหญ่ (768-4096 dims) หลายล้าน Records ใช้ RAM Storage มาก ไม่พอใน Server เดียว ต้องใช้ Distributed Vector Database เช่น Milvus Qdrant Weaviate Shard ข้าม Nodes

Vector Database เลือกอันไหนดี

Milvus Enterprise Scale Billions of Vectors, Qdrant Production ใช้ง่าย API สวย, Weaviate Multi-modal, Pinecone Managed Service, pgvector ทีมที่ใช้ PostgreSQL อยู่แล้ว

สรุป

Embedding Models ในระบบ Distributed ต้องมี Embedding Service สร้าง Vectors, Distributed Vector Database เก็บและค้นหา เลือก Model ที่เหมาะกับงาน ใช้ Quantization ลด Resources Shard ข้าม Nodes Replicate สำหรับ HA Cache Frequent Queries Batch Processing สำหรับ Performance

📖 บทความที่เกี่ยวข้อง

Embedding Model Real-time Processingอ่านบทความ → Python Pydantic Distributed Systemอ่านบทความ → Embedding Model Service Mesh Setupอ่านบทความ → Embedding Model Team Productivityอ่านบทความ → XDR Platform Distributed Systemอ่านบทความ →

📚 ดูบทความทั้งหมด →