
LangChain Agent Scaling Strategy วิธี Scale —
LangChain Agent Scaling

LangChain Agent Scaling Strategy Async Processing Rate Limiting Caching Queue Load Balancing Streaming Batch Model Selection Production Deployment
| Strategy | Impact | Complexity | Cost Reduction | Latency Impact |
|---|---|---|---|---|
| Semantic Cache | ลด API Call 30-50% | ปานกลาง | สูง | -80% (cache hit) |
| Async Processing | เพิ่ม Throughput 5-10x | ปานกลาง | ไม่มี | ลด Wait Time |
| Queue System | จัดการ Burst Traffic | ปานกลาง | ไม่มี | เพิ่มเล็กน้อย |
| Model Routing | ลด Cost 40-60% | สูง | สูงมาก | ลด (smaller model) |
| Streaming | ลด Perceived Latency | ง่าย | ไม่มี | -70% TTFT |
| Rate Limiting | ป้องกัน Overload | ง่าย | ป้องกัน Overspend | ไม่มี |
Agent Architecture
=== LangChain Agent with Scaling ===
pip install langchain langchain-openai redis celery
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
import langchain
import asyncio
import redis
# Semantic Cache Setup
langchain.llm_cache = RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=OpenAIEmbeddings(),
score_threshold=0.95, # Cache hit threshold
)
# Model Router — Choose model by complexity
def get_model(complexity: str) -> ChatOpenAI:
models = {
"simple": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
"medium": ChatOpenAI(model="gpt-4o-mini", temperature=0),
"complex": ChatOpenAI(model="gpt-4o", temperature=0),
}
return models.get(complexity, models["medium"])
# Tools
@tool
def search_database(query: str) -> str:
"""Search the product database"""
# DB query logic
return f"Results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Calculate mathematical expression"""
return str(eval(expression))
# Async Agent Executor
async def run_agent_async(query: str, complexity: str = "medium"):
llm = get_model(complexity)
agent = create_tool_calling_agent(llm, [search_database, calculate], prompt)
executor = AgentExecutor(agent=agent, tools=[search_database, calculate])
result = await executor.ainvoke({"input": query})
return result
# Batch Processing
async def process_batch(queries: list[str]):
tasks = [run_agent_async(q) for q in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
from dataclasses import dataclass
@dataclass
class ScalingLayer:
layer: str
technology: str
purpose: str
throughput: str
config: str
layers = [
ScalingLayer("Load Balancer", "Nginx / Traefik", "Distribute requests", "10K+ rps", "Round-robin / least-conn"),
ScalingLayer("API Server", "FastAPI + uvicorn", "Handle HTTP requests", "1K+ rps per worker", "4-8 workers per CPU"),
ScalingLayer("Queue", "Redis + Celery", "Buffer async tasks", "50K+ msg/s", "Multiple queues by priority"),
ScalingLayer("Cache", "Redis Semantic Cache", "Cache LLM responses", "100K+ ops/s", "TTL 1h, threshold 0.95"),
ScalingLayer("LLM Gateway", "LiteLLM / custom", "Route to models", "Based on model", "Fallback chain"),
ScalingLayer("Agent Workers", "Celery workers", "Execute agent logic", "10-50 concurrent", "Autoscale on queue"),
]
print("=== Scaling Architecture ===")
for l in layers:
print(f" [{l.layer}] {l.technology}")
print(f" Purpose: {l.purpose} | Throughput: {l.throughput}")
print(f" Config: {l.config}")
Rate Limiting and Caching

=== Rate Limiting & Caching ===
Token Bucket Rate Limiter
import time
from collections import defaultdict
class TokenBucketLimiter:
def __init__(self, rate: float, capacity: int):
self.rate = rate # tokens per second
self.capacity = capacity # max tokens
self.buckets = defaultdict(lambda: {"tokens": capacity, "last": time.time()})
def allow(self, key: str, tokens: int = 1) -> bool:
bucket = self.buckets[key]
now = time.time()
elapsed = now - bucket["last"]
bucket["tokens"] = min(self.capacity, bucket["tokens"] + elapsed * self.rate)
bucket["last"] = now
if bucket["tokens"] >= tokens:
bucket["tokens"] -= tokens
return True
return False
limiter = TokenBucketLimiter(rate=10, capacity=60) # 10 req/s, burst 60
Retry with Exponential Backoff
import openai
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type(openai.RateLimitError),
)
async def call_llm_with_retry(prompt: str):
return await llm.ainvoke(prompt)
Semantic Cache with Redis
from langchain.cache import RedisSemanticCache
cache = RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
score_threshold=0.92,
ttl=3600, # 1 hour
)
@dataclass
class CacheStrategy:
strategy: str
hit_rate: str
latency_saved: str
cost_saved: str
complexity: str
use_case: str
strategies = [
CacheStrategy("Exact Match", "10-20%", "2-5s per hit", "10-20%", "ง่าย", "Repeated exact queries"),
CacheStrategy("Semantic Cache", "30-50%", "2-5s per hit", "30-50%", "ปานกลาง", "Similar questions"),
CacheStrategy("Tool Result Cache", "40-60%", "0.5-2s per hit", "20-30%", "ง่าย", "DB/API results"),
CacheStrategy("Conversation Cache", "20-30%", "1-3s per hit", "15-25%", "ปานกลาง", "Ongoing sessions"),
CacheStrategy("Embedding Cache", "80-90%", "0.1s per hit", "40-60%", "ง่าย", "Repeated embeddings"),
]
print("\n=== Cache Strategies ===")
for s in strategies:
print(f" [{s.strategy}] Hit Rate: {s.hit_rate} | Saved: {s.cost_saved}")
print(f" Latency: {s.latency_saved} | Complexity: {s.complexity}")
print(f" Use Case: {s.use_case}")
Production Deployment
# === Production Agent Deployment ===
# Docker Compose — Full Stack
# version: '3.8'
# services:
# api:
# build: .
# ports: ["8000:8000"]
# environment:
# - OPENAI_API_KEY=
# - REDIS_URL=redis://redis:6379
# deploy:
# replicas: 4
# worker:
# build: .
# command: celery -A tasks worker -c 8 --autoscale=16,4
# environment:
# - OPENAI_API_KEY=
# - REDIS_URL=redis://redis:6379
# redis:
# image: redis:7-alpine
# ports: ["6379:6379"]
# nginx:
# image: nginx:alpine
# ports: ["80:80", "443:443"]
@dataclass
class ProdMetric:
metric: str
value: str
target: str
alert: str
metrics = [
ProdMetric("Agent Latency (p50)", "1.2s", "<2s", "> 3s"),
ProdMetric("Agent Latency (p99)", "8.5s", "<10s", "> 15s"),
ProdMetric("Cache Hit Rate", "42%", ">35%", "< 20%"),
ProdMetric("Success Rate", "98.5%", ">98%", "< 95%"),
ProdMetric("Throughput", "120 req/min", ">100", "< 50"),
ProdMetric("LLM Token Cost/day", "$45", "<$100", "> $150"),
ProdMetric("Queue Depth", "12", "<50", "> 100"),
ProdMetric("Worker Utilization", "65%", "<80%", "> 90%"),
]
print("Production Metrics:")
for m in metrics:
status = "OK" if m.value != m.alert else "ALERT"
print(f" [{status}] {m.metric}: {m.value} (Target: {m.target})")
cost_optimization = {
"Model Routing": "Simple→GPT-3.5 Complex→GPT-4o ลด 40-60%",
"Semantic Cache": "Cache similar queries ลด 30-50% API calls",
"Prompt Compression": "ลด Token ใน System Prompt ลด 10-20%",
"Batch Embedding": "รวม Embedding requests ลด overhead",
"Off-peak Processing": "Queue non-urgent tasks to off-peak",
"Token Monitoring": "Track token usage per user per feature",
}
print(f"\n\nCost Optimization:")
for k, v in cost_optimization.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Cache First: ใช้ Semantic Cache ก่อน ลด Cost ได้มาก
- Async: ใช้ Async ทุกที่ ไม่ Block ระหว่างรอ LLM
- Model Route: ใช้ Model เล็กสำหรับงานง่าย ประหยัด 40%+
- Queue: ใช้ Queue จัดการ Burst Traffic ป้องกัน Rate Limit
- Monitor: ดู Token Cost Latency Cache Hit Rate ทุกวัน
LangChain Agent คืออะไร
AI Component LLM ตัดสินใจเลือก Tool ReAct Tool Calling Plan-and-Execute API Database Customer Support Research Workflow Automation