SiamCafe.net Blog
Technology

LangChain Agent Scaling Strategy วิธี Scale

langchain agent scaling strategy วธ scale
LangChain Agent Scaling Strategy วิธี Scale | SiamCafe Blog
2025-12-04· อ. บอม — SiamCafe.net· 10,948 คำ

LangChain Agent Scaling

LangChain Agent Scaling Strategy Async Processing Rate Limiting Caching Queue Load Balancing Streaming Batch Model Selection Production Deployment

StrategyImpactComplexityCost ReductionLatency Impact
Semantic Cacheลด API Call 30-50%ปานกลางสูง-80% (cache hit)
Async Processingเพิ่ม Throughput 5-10xปานกลางไม่มีลด Wait Time
Queue Systemจัดการ Burst Trafficปานกลางไม่มีเพิ่มเล็กน้อย
Model Routingลด Cost 40-60%สูงสูงมากลด (smaller model)
Streamingลด Perceived Latencyง่ายไม่มี-70% TTFT
Rate Limitingป้องกัน Overloadง่ายป้องกัน Overspendไม่มี

Agent Architecture

# === LangChain Agent with Scaling ===

# pip install langchain langchain-openai redis celery

# from langchain.agents import create_tool_calling_agent, AgentExecutor
# from langchain_openai import ChatOpenAI
# from langchain.tools import tool
# from langchain.cache import RedisSemanticCache
# from langchain_openai import OpenAIEmbeddings
# import langchain
# import asyncio
# import redis
#
# # Semantic Cache Setup
# langchain.llm_cache = RedisSemanticCache(
#     redis_url="redis://localhost:6379",
#     embedding=OpenAIEmbeddings(),
#     score_threshold=0.95,  # Cache hit threshold
# )
#
# # Model Router — Choose model by complexity
# def get_model(complexity: str) -> ChatOpenAI:
#     models = {
#         "simple": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
#         "medium": ChatOpenAI(model="gpt-4o-mini", temperature=0),
#         "complex": ChatOpenAI(model="gpt-4o", temperature=0),
#     }
#     return models.get(complexity, models["medium"])
#
# # Tools
# @tool
# def search_database(query: str) -> str:
#     """Search the product database"""
#     # DB query logic
#     return f"Results for: {query}"
#
# @tool
# def calculate(expression: str) -> str:
#     """Calculate mathematical expression"""
#     return str(eval(expression))
#
# # Async Agent Executor
# async def run_agent_async(query: str, complexity: str = "medium"):
#     llm = get_model(complexity)
#     agent = create_tool_calling_agent(llm, [search_database, calculate], prompt)
#     executor = AgentExecutor(agent=agent, tools=[search_database, calculate])
#     result = await executor.ainvoke({"input": query})
#     return result
#
# # Batch Processing
# async def process_batch(queries: list[str]):
#     tasks = [run_agent_async(q) for q in queries]
#     results = await asyncio.gather(*tasks, return_exceptions=True)
#     return results

from dataclasses import dataclass

@dataclass
class ScalingLayer:
    layer: str
    technology: str
    purpose: str
    throughput: str
    config: str

layers = [
    ScalingLayer("Load Balancer", "Nginx / Traefik", "Distribute requests", "10K+ rps", "Round-robin / least-conn"),
    ScalingLayer("API Server", "FastAPI + uvicorn", "Handle HTTP requests", "1K+ rps per worker", "4-8 workers per CPU"),
    ScalingLayer("Queue", "Redis + Celery", "Buffer async tasks", "50K+ msg/s", "Multiple queues by priority"),
    ScalingLayer("Cache", "Redis Semantic Cache", "Cache LLM responses", "100K+ ops/s", "TTL 1h, threshold 0.95"),
    ScalingLayer("LLM Gateway", "LiteLLM / custom", "Route to models", "Based on model", "Fallback chain"),
    ScalingLayer("Agent Workers", "Celery workers", "Execute agent logic", "10-50 concurrent", "Autoscale on queue"),
]

print("=== Scaling Architecture ===")
for l in layers:
    print(f"  [{l.layer}] {l.technology}")
    print(f"    Purpose: {l.purpose} | Throughput: {l.throughput}")
    print(f"    Config: {l.config}")

Rate Limiting and Caching

# === Rate Limiting & Caching ===

# Token Bucket Rate Limiter
# import time
# from collections import defaultdict
#
# class TokenBucketLimiter:
#     def __init__(self, rate: float, capacity: int):
#         self.rate = rate          # tokens per second
#         self.capacity = capacity  # max tokens
#         self.buckets = defaultdict(lambda: {"tokens": capacity, "last": time.time()})
#
#     def allow(self, key: str, tokens: int = 1) -> bool:
#         bucket = self.buckets[key]
#         now = time.time()
#         elapsed = now - bucket["last"]
#         bucket["tokens"] = min(self.capacity, bucket["tokens"] + elapsed * self.rate)
#         bucket["last"] = now
#         if bucket["tokens"] >= tokens:
#             bucket["tokens"] -= tokens
#             return True
#         return False
#
# limiter = TokenBucketLimiter(rate=10, capacity=60)  # 10 req/s, burst 60

# Retry with Exponential Backoff
# import openai
# from tenacity import retry, stop_after_attempt, wait_exponential
#
# @retry(
#     stop=stop_after_attempt(5),
#     wait=wait_exponential(multiplier=1, min=2, max=60),
#     retry=retry_if_exception_type(openai.RateLimitError),
# )
# async def call_llm_with_retry(prompt: str):
#     return await llm.ainvoke(prompt)

# Semantic Cache with Redis
# from langchain.cache import RedisSemanticCache
#
# cache = RedisSemanticCache(
#     redis_url="redis://localhost:6379",
#     embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
#     score_threshold=0.92,
#     ttl=3600,  # 1 hour
# )

@dataclass
class CacheStrategy:
    strategy: str
    hit_rate: str
    latency_saved: str
    cost_saved: str
    complexity: str
    use_case: str

strategies = [
    CacheStrategy("Exact Match", "10-20%", "2-5s per hit", "10-20%", "ง่าย", "Repeated exact queries"),
    CacheStrategy("Semantic Cache", "30-50%", "2-5s per hit", "30-50%", "ปานกลาง", "Similar questions"),
    CacheStrategy("Tool Result Cache", "40-60%", "0.5-2s per hit", "20-30%", "ง่าย", "DB/API results"),
    CacheStrategy("Conversation Cache", "20-30%", "1-3s per hit", "15-25%", "ปานกลาง", "Ongoing sessions"),
    CacheStrategy("Embedding Cache", "80-90%", "0.1s per hit", "40-60%", "ง่าย", "Repeated embeddings"),
]

print("\n=== Cache Strategies ===")
for s in strategies:
    print(f"  [{s.strategy}] Hit Rate: {s.hit_rate} | Saved: {s.cost_saved}")
    print(f"    Latency: {s.latency_saved} | Complexity: {s.complexity}")
    print(f"    Use Case: {s.use_case}")

Production Deployment

# === Production Agent Deployment ===

# Docker Compose — Full Stack
# version: '3.8'
# services:
#   api:
#     build: .
#     ports: ["8000:8000"]
#     environment:
#       - OPENAI_API_KEY=
#       - REDIS_URL=redis://redis:6379
#     deploy:
#       replicas: 4
#   worker:
#     build: .
#     command: celery -A tasks worker -c 8 --autoscale=16,4
#     environment:
#       - OPENAI_API_KEY=
#       - REDIS_URL=redis://redis:6379
#   redis:
#     image: redis:7-alpine
#     ports: ["6379:6379"]
#   nginx:
#     image: nginx:alpine
#     ports: ["80:80", "443:443"]

@dataclass
class ProdMetric:
    metric: str
    value: str
    target: str
    alert: str

metrics = [
    ProdMetric("Agent Latency (p50)", "1.2s", "<2s", "> 3s"),
    ProdMetric("Agent Latency (p99)", "8.5s", "<10s", "> 15s"),
    ProdMetric("Cache Hit Rate", "42%", ">35%", "< 20%"),
    ProdMetric("Success Rate", "98.5%", ">98%", "< 95%"),
    ProdMetric("Throughput", "120 req/min", ">100", "< 50"),
    ProdMetric("LLM Token Cost/day", "$45", "<$100", "> $150"),
    ProdMetric("Queue Depth", "12", "<50", "> 100"),
    ProdMetric("Worker Utilization", "65%", "<80%", "> 90%"),
]

print("Production Metrics:")
for m in metrics:
    status = "OK" if m.value != m.alert else "ALERT"
    print(f"  [{status}] {m.metric}: {m.value} (Target: {m.target})")

cost_optimization = {
    "Model Routing": "Simple→GPT-3.5 Complex→GPT-4o ลด 40-60%",
    "Semantic Cache": "Cache similar queries ลด 30-50% API calls",
    "Prompt Compression": "ลด Token ใน System Prompt ลด 10-20%",
    "Batch Embedding": "รวม Embedding requests ลด overhead",
    "Off-peak Processing": "Queue non-urgent tasks to off-peak",
    "Token Monitoring": "Track token usage per user per feature",
}

print(f"\n\nCost Optimization:")
for k, v in cost_optimization.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

LangChain Agent คืออะไร

AI Component LLM ตัดสินใจเลือก Tool ReAct Tool Calling Plan-and-Execute API Database Customer Support Research Workflow Automation

Scale LangChain Agent อย่างไร

Async Concurrent Queue Celery Redis Cache Semantic Rate Limiting Load Balancing Streaming Batch Model Selection Worker Autoscale

จัดการ Rate Limit ของ LLM API อย่างไร

Token Bucket Retry Exponential Backoff Queue Buffer Fallback Model Cache Response Monitor Token Budget Alert Batch Similar

Optimize Agent Performance อย่างไร

Semantic Cache Prompt Optimization Smaller Model Tool Selection Parallel Execution Streaming Connection Pooling Async I/O ไม่ Block

สรุป

LangChain Agent Scaling Strategy Async Cache Rate Limiting Queue Model Routing Streaming Load Balancing Celery Redis Production Cost Optimization

📖 บทความที่เกี่ยวข้อง

AWS Step Functions Scaling Strategy วิธี Scaleอ่านบทความ → LangChain Agent Low Code No Codeอ่านบทความ → LangChain Agent Home Lab Setupอ่านบทความ → LangChain Agent Event Driven Designอ่านบทความ → BigQuery Scheduled Query Scaling Strategy วิธี Scaleอ่านบทความ →

📚 ดูบทความทั้งหมด →