ai

Embedding Model กับ Distributed System — วิธีใช้

Embedding Model กับ Distributed System — วิธีใช้

Embedding Models ในระบบ Distributed

Embedding Model กับ Distributed System — วิธีใช้

Embedding Models แปลงข้อมูลเป็น Vector ตัวเลขที่เก็บความหมาย ใช้สำหรับ Semantic Search, Recommendation และ Similarity Matching เมื่อข้อมูลมีหลายล้าน Records ต้องใช้ Distributed System กระจาย Vectors ข้าม Nodes เพื่อ Performance และ Scalability

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน container html คือ — ข้อมูลครบถ้วน 2026

ระบบ Distributed สำหรับ Embeddings ประกอบด้วย Embedding Service (สร้าง Vectors), Vector Database (เก็บและค้นหา), API Gateway (รับ Requests) และ Monitoring (ดูแล Performance)

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Semgrep SAST Identity Access Management

Embedding Service

# embedding_service.py — Distributed Embedding Service

# pip install sentence-transformers fastapi uvicorn numpy



from sentence_transformers import SentenceTransformer

import numpy as np

from typing import List, Dict, Optional

from dataclasses import dataclass

import time

import hashlib

import json



class EmbeddingService:

    """Embedding Service รองรับหลาย Models"""



    MODELS = {

        "mini": "all-MiniLM-L6-v2",          # 384 dims, เร็ว

        "base": "all-mpnet-base-v2",          # 768 dims, สมดุล

        "large": "BAAI/bge-large-en-v1.5",   # 1024 dims, แม่นยำ

        "multi": "paraphrase-multilingual-MiniLM-L12-v2",  # 384 dims, หลายภาษา

    }



    def __init__(self, model_name="base", device="cpu", cache_size=10000):

        model_id = self.MODELS.get(model_name, model_name)

        self.model = SentenceTransformer(model_id, device=device)

        self.model_name = model_name

        self.dims = self.model.get_sentence_embedding_dimension()

        self.cache = {}

        self.cache_size = cache_size

        self.stats = {"requests": 0, "cache_hits": 0, "total_texts": 0}



        print(f"Model: {model_id} ({self.dims} dims)")



    def embed(self, texts: List[str], normalize=True) -> np.ndarray:

        """สร้าง Embeddings"""

        self.stats["requests"] += 1

        self.stats["total_texts"] += len(texts)



        # Check Cache

        uncached_texts = []

        uncached_indices = []

        results = [None] * len(texts)



        for i, text in enumerate(texts):

            key = self._cache_key(text)

            if key in self.cache:

                results[i] = self.cache[key]

                self.stats["cache_hits"] += 1

            else:

                uncached_texts.append(text)

                uncached_indices.append(i)



        # Embed uncached texts

        if uncached_texts:

            embeddings = self.model.encode(

                uncached_texts,

                normalize_embeddings=normalize,

                batch_size=32,

                show_progress_bar=False,

            )



            for idx, emb in zip(uncached_indices, embeddings):

                results[idx] = emb

                key = self._cache_key(uncached_texts[uncached_indices.index(idx)])

                if len(self.cache) < self.cache_size:

                    self.cache[key] = emb



        return np.array(results)



    def similarity(self, text1: str, text2: str) -> float:

        """คำนวณ Cosine Similarity"""

        embs = self.embed([text1, text2])

        return float(np.dot(embs[0], embs[1]))



    def search(self, query: str, documents: List[str], top_k=5):

        """Semantic Search"""

        all_texts = [query] + documents

        embs = self.embed(all_texts)

        query_emb = embs[0]

        doc_embs = embs[1:]



        scores = np.dot(doc_embs, query_emb)

        top_indices = np.argsort(scores)[::-1][:top_k]



        results = []

        for idx in top_indices:

            results.append({

                "document": documents[idx],

                "score": float(scores[idx]),

                "rank": len(results) + 1,

            })

        return results



    def _cache_key(self, text):

        return hashlib.md5(text.encode()).hexdigest()



    def print_stats(self):

        hit_rate = (self.stats["cache_hits"] / self.stats["total_texts"] * 100

                    if self.stats["total_texts"] > 0 else 0)

        print(f"\nEmbedding Service Stats:")

        print(f"  Model: {self.model_name} ({self.dims}d)")

        print(f"  Requests: {self.stats['requests']}")

        print(f"  Texts: {self.stats['total_texts']}")

        print(f"  Cache Hits: {self.stats['cache_hits']} ({hit_rate:.1f}%)")



# ตัวอย่าง

svc = EmbeddingService("mini")



docs = [

    "Python programming language tutorial",

    "Machine learning with TensorFlow",

    "Web development with React",

    "Database optimization techniques",

    "Cloud computing with AWS",

    "DevOps automation pipeline",

    "Kubernetes container orchestration",

    "Data science and analytics",

]



results = svc.search("how to deploy applications", docs, top_k=3)

print("\nSearch: 'how to deploy applications'")

for r in results:

    print(f"  [{r['rank']}] {r['score']:.3f} — {r['document']}")



svc.print_stats()

Distributed Vector Storage

Embedding Model กับ Distributed System — วิธีใช้
# distributed_vectors.py — Distributed Vector Storage

# pip install qdrant-client numpy



from qdrant_client import QdrantClient

from qdrant_client.models import (

    Distance, VectorParams, PointStruct,

    Filter, FieldCondition, MatchValue,

    OptimizersConfigDiff, HnswConfigDiff,

)

import numpy as np

from typing import List, Dict, Optional

import uuid

import time



class DistributedVectorStore:

    """Distributed Vector Storage ด้วย Qdrant"""



    def __init__(self, hosts=None, collection="embeddings", dims=384):

        if hosts is None:

            hosts = ["localhost:6333"]



        # เชื่อมต่อ Qdrant Cluster

        self.client = QdrantClient(host=hosts[0].split(":")[0],

                                    port=int(hosts[0].split(":")[1]))

        self.collection = collection

        self.dims = dims

        self._ensure_collection()



    def _ensure_collection(self):

        """สร้าง Collection ถ้ายังไม่มี"""

        collections = [c.name for c in self.client.get_collections().collections]

        if self.collection not in collections:

            self.client.create_collection(

                collection_name=self.collection,

                vectors_config=VectorParams(

                    size=self.dims,

                    distance=Distance.COSINE,

                ),

                optimizers_config=OptimizersConfigDiff(

                    indexing_threshold=20000,

                ),

                hnsw_config=HnswConfigDiff(

                    m=16,

                    ef_construct=100,

                ),

                shard_number=3,  # Distribute across 3 shards

                replication_factor=2,  # 2 replicas for HA

            )



    def upsert(self, ids, vectors, payloads=None):

        """เพิ่ม/อัพเดท Vectors"""

        points = []

        for i, (vid, vec) in enumerate(zip(ids, vectors)):

            payload = payloads[i] if payloads else {}

            points.append(PointStruct(

                id=vid, vector=vec.tolist(), payload=payload,

            ))



        self.client.upsert(

            collection_name=self.collection,

            points=points,

        )



    def search(self, query_vector, top_k=10, filters=None):

        """ค้นหา Nearest Neighbors"""

        query_filter = None

        if filters:

            conditions = []

            for key, value in filters.items():

                conditions.append(FieldCondition(

                    key=key, match=MatchValue(value=value),

                ))

            query_filter = Filter(must=conditions)



        results = self.client.search(

            collection_name=self.collection,

            query_vector=query_vector.tolist(),

            limit=top_k,

            query_filter=query_filter,

        )



        return [{"id": r.id, "score": r.score, "payload": r.payload}

                for r in results]



    def info(self):

        """แสดงข้อมูล Collection"""

        info = self.client.get_collection(self.collection)

        print(f"\nCollection: {self.collection}")

        print(f"  Points: {info.points_count}")

        print(f"  Vectors: {info.vectors_count}")

        print(f"  Dimensions: {self.dims}")

        print(f"  Status: {info.status}")



# === Qdrant Cluster Setup (Docker Compose) ===

# version: '3.8'

# services:

#   qdrant-node1:

#     image: qdrant/qdrant:latest

#     ports:

#       - "6333:6333"

#     environment:

#       - QDRANT__CLUSTER__ENABLED=true

#       - QDRANT__CLUSTER__P2P__PORT=6335

#     volumes:

#       - qdrant1-data:/qdrant/storage

#

#   qdrant-node2:

#     image: qdrant/qdrant:latest

#     environment:

#       - QDRANT__CLUSTER__ENABLED=true

#       - QDRANT__CLUSTER__P2P__PORT=6335

#       - QDRANT__CLUSTER__BOOTSTRAP=http://qdrant-node1:6335

#     volumes:

#       - qdrant2-data:/qdrant/storage

#

#   qdrant-node3:

#     image: qdrant/qdrant:latest

#     environment:

#       - QDRANT__CLUSTER__ENABLED=true

#       - QDRANT__CLUSTER__P2P__PORT=6335

#       - QDRANT__CLUSTER__BOOTSTRAP=http://qdrant-node1:6335

#     volumes:

#       - qdrant3-data:/qdrant/storage

#

# volumes:

#   qdrant1-data:

#   qdrant2-data:

#   qdrant3-data:



print("Distributed Vector Store configured")

print("  Shards: 3 | Replication: 2")

Scaling Strategy

# scaling_vectors.py — Vector Database Scaling Strategy



import math



def calculate_resources(num_vectors, dims, precision="float32"):

    """คำนวณ Resources ที่ต้องการ"""

    bytes_per_dim = {"float32": 4, "float16": 2, "int8": 1}

    bpd = bytes_per_dim.get(precision, 4)



    # Vector Storage

    vector_bytes = num_vectors * dims * bpd

    vector_gb = vector_bytes / (1024**3)



    # HNSW Index (ประมาณ 1.5x vector size)

    index_gb = vector_gb * 1.5



    # Payload Storage (ประมาณ 1KB per vector)

    payload_gb = num_vectors * 1024 / (1024**3)



    total_gb = vector_gb + index_gb + payload_gb



    # RAM Requirement (Index + Hot Data)

    ram_gb = index_gb + (vector_gb * 0.2)  # 20% hot data



    # Shard Calculation (max 10M vectors per shard)

    shards = max(1, math.ceil(num_vectors / 10_000_000))



    return {

        "vectors": f"{num_vectors:,}",

        "dimensions": dims,

        "precision": precision,

        "vector_storage_gb": round(vector_gb, 1),

        "index_gb": round(index_gb, 1),

        "payload_gb": round(payload_gb, 1),

        "total_storage_gb": round(total_gb, 1),

        "ram_required_gb": round(ram_gb, 1),

        "recommended_shards": shards,

        "recommended_replicas": min(3, shards),

    }



# === ตัวอย่างขนาดต่างๆ ===

scenarios = [

    (100_000, 384, "float32", "Small — Startup"),

    (1_000_000, 768, "float32", "Medium — Growing Business"),

    (10_000_000, 768, "float16", "Large — Enterprise"),

    (100_000_000, 384, "int8", "Massive — Big Tech"),

]



print("Vector Database Sizing Guide")

print("=" * 60)



for num, dims, prec, label in scenarios:

    res = calculate_resources(num, dims, prec)

    print(f"\n  {label}")

    print(f"    Vectors: {res['vectors']} x {dims}d ({prec})")

    print(f"    Storage: {res['total_storage_gb']} GB")

    print(f"    RAM: {res['ram_required_gb']} GB")

    print(f"    Shards: {res['recommended_shards']} | "

          f"Replicas: {res['recommended_replicas']}")



# Scaling Tips

print(f"\nScaling Tips:")

print(f"  - ใช้ float16 ลด Storage 50% Accuracy ลดน้อยมาก")

print(f"  - ใช้ Product Quantization ลด Memory อีก 4-8x")

print(f"  - Shard ตาม Region ลด Latency")

print(f"  - Separate Read/Write Replicas")

Best Practices

  • Model Selection: เลือก Model ที่เหมาะ mini (384d เร็ว) base (768d สมดุล) large (1024d แม่นยำ)
  • Quantization: ใช้ float16 หรือ int8 ลด Storage และ RAM โดย Accuracy ลดน้อยมาก
  • Sharding: Shard ข้อมูลข้าม Nodes สำหรับ Parallel Search ลด Latency
  • Replication: Replicate อย่างน้อย 2x สำหรับ High Availability
  • Caching: Cache Embeddings ของ Frequent Queries ลด Computation
  • Batch Processing: ส่ง Texts เป็น Batch (32-64) สร้าง Embeddings ได้เร็วกว่าทีละอัน

Embedding Model คืออะไร

ML Model แปลงข้อมูล (Text Image Audio) เป็น Vector ตัวเลขเก็บความหมาย ข้อมูลคล้ายกัน Vector ใกล้กัน ใช้ Semantic Search Recommendation Clustering ตัวอย่าง OpenAI Embeddings Sentence-BERT CLIP

แนะนำเพิ่มเติม — อีบุ๊กการลงทุน SiamCafeBook

เนื้อหาเกี่ยวข้อง — อ่านต่อ: LLM Inference vLLM Tech Conference 2026

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง