LangChain Agent Scaling
LangChain Agent Scaling Strategy Async Processing Rate Limiting Caching Queue Load Balancing Streaming Batch Model Selection Production Deployment
| Strategy | Impact | Complexity | Cost Reduction | Latency Impact |
|---|---|---|---|---|
| Semantic Cache | ลด API Call 30-50% | ปานกลาง | สูง | -80% (cache hit) |
| Async Processing | เพิ่ม Throughput 5-10x | ปานกลาง | ไม่มี | ลด Wait Time |
| Queue System | จัดการ Burst Traffic | ปานกลาง | ไม่มี | เพิ่มเล็กน้อย |
| Model Routing | ลด Cost 40-60% | สูง | สูงมาก | ลด (smaller model) |
| Streaming | ลด Perceived Latency | ง่าย | ไม่มี | -70% TTFT |
| Rate Limiting | ป้องกัน Overload | ง่าย | ป้องกัน Overspend | ไม่มี |
Agent Architecture
# === LangChain Agent with Scaling ===
# pip install langchain langchain-openai redis celery
# from langchain.agents import create_tool_calling_agent, AgentExecutor
# from langchain_openai import ChatOpenAI
# from langchain.tools import tool
# from langchain.cache import RedisSemanticCache
# from langchain_openai import OpenAIEmbeddings
# import langchain
# import asyncio
# import redis
#
# # Semantic Cache Setup
# langchain.llm_cache = RedisSemanticCache(
# redis_url="redis://localhost:6379",
# embedding=OpenAIEmbeddings(),
# score_threshold=0.95, # Cache hit threshold
# )
#
# # Model Router — Choose model by complexity
# def get_model(complexity: str) -> ChatOpenAI:
# models = {
# "simple": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
# "medium": ChatOpenAI(model="gpt-4o-mini", temperature=0),
# "complex": ChatOpenAI(model="gpt-4o", temperature=0),
# }
# return models.get(complexity, models["medium"])
#
# # Tools
# @tool
# def search_database(query: str) -> str:
# """Search the product database"""
# # DB query logic
# return f"Results for: {query}"
#
# @tool
# def calculate(expression: str) -> str:
# """Calculate mathematical expression"""
# return str(eval(expression))
#
# # Async Agent Executor
# async def run_agent_async(query: str, complexity: str = "medium"):
# llm = get_model(complexity)
# agent = create_tool_calling_agent(llm, [search_database, calculate], prompt)
# executor = AgentExecutor(agent=agent, tools=[search_database, calculate])
# result = await executor.ainvoke({"input": query})
# return result
#
# # Batch Processing
# async def process_batch(queries: list[str]):
# tasks = [run_agent_async(q) for q in queries]
# results = await asyncio.gather(*tasks, return_exceptions=True)
# return results
from dataclasses import dataclass
@dataclass
class ScalingLayer:
layer: str
technology: str
purpose: str
throughput: str
config: str
layers = [
ScalingLayer("Load Balancer", "Nginx / Traefik", "Distribute requests", "10K+ rps", "Round-robin / least-conn"),
ScalingLayer("API Server", "FastAPI + uvicorn", "Handle HTTP requests", "1K+ rps per worker", "4-8 workers per CPU"),
ScalingLayer("Queue", "Redis + Celery", "Buffer async tasks", "50K+ msg/s", "Multiple queues by priority"),
ScalingLayer("Cache", "Redis Semantic Cache", "Cache LLM responses", "100K+ ops/s", "TTL 1h, threshold 0.95"),
ScalingLayer("LLM Gateway", "LiteLLM / custom", "Route to models", "Based on model", "Fallback chain"),
ScalingLayer("Agent Workers", "Celery workers", "Execute agent logic", "10-50 concurrent", "Autoscale on queue"),
]
print("=== Scaling Architecture ===")
for l in layers:
print(f" [{l.layer}] {l.technology}")
print(f" Purpose: {l.purpose} | Throughput: {l.throughput}")
print(f" Config: {l.config}")
Rate Limiting and Caching
# === Rate Limiting & Caching ===
# Token Bucket Rate Limiter
# import time
# from collections import defaultdict
#
# class TokenBucketLimiter:
# def __init__(self, rate: float, capacity: int):
# self.rate = rate # tokens per second
# self.capacity = capacity # max tokens
# self.buckets = defaultdict(lambda: {"tokens": capacity, "last": time.time()})
#
# def allow(self, key: str, tokens: int = 1) -> bool:
# bucket = self.buckets[key]
# now = time.time()
# elapsed = now - bucket["last"]
# bucket["tokens"] = min(self.capacity, bucket["tokens"] + elapsed * self.rate)
# bucket["last"] = now
# if bucket["tokens"] >= tokens:
# bucket["tokens"] -= tokens
# return True
# return False
#
# limiter = TokenBucketLimiter(rate=10, capacity=60) # 10 req/s, burst 60
# Retry with Exponential Backoff
# import openai
# from tenacity import retry, stop_after_attempt, wait_exponential
#
# @retry(
# stop=stop_after_attempt(5),
# wait=wait_exponential(multiplier=1, min=2, max=60),
# retry=retry_if_exception_type(openai.RateLimitError),
# )
# async def call_llm_with_retry(prompt: str):
# return await llm.ainvoke(prompt)
# Semantic Cache with Redis
# from langchain.cache import RedisSemanticCache
#
# cache = RedisSemanticCache(
# redis_url="redis://localhost:6379",
# embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
# score_threshold=0.92,
# ttl=3600, # 1 hour
# )
@dataclass
class CacheStrategy:
strategy: str
hit_rate: str
latency_saved: str
cost_saved: str
complexity: str
use_case: str
strategies = [
CacheStrategy("Exact Match", "10-20%", "2-5s per hit", "10-20%", "ง่าย", "Repeated exact queries"),
CacheStrategy("Semantic Cache", "30-50%", "2-5s per hit", "30-50%", "ปานกลาง", "Similar questions"),
CacheStrategy("Tool Result Cache", "40-60%", "0.5-2s per hit", "20-30%", "ง่าย", "DB/API results"),
CacheStrategy("Conversation Cache", "20-30%", "1-3s per hit", "15-25%", "ปานกลาง", "Ongoing sessions"),
CacheStrategy("Embedding Cache", "80-90%", "0.1s per hit", "40-60%", "ง่าย", "Repeated embeddings"),
]
print("\n=== Cache Strategies ===")
for s in strategies:
print(f" [{s.strategy}] Hit Rate: {s.hit_rate} | Saved: {s.cost_saved}")
print(f" Latency: {s.latency_saved} | Complexity: {s.complexity}")
print(f" Use Case: {s.use_case}")
Production Deployment
# === Production Agent Deployment ===
# Docker Compose — Full Stack
# version: '3.8'
# services:
# api:
# build: .
# ports: ["8000:8000"]
# environment:
# - OPENAI_API_KEY=
# - REDIS_URL=redis://redis:6379
# deploy:
# replicas: 4
# worker:
# build: .
# command: celery -A tasks worker -c 8 --autoscale=16,4
# environment:
# - OPENAI_API_KEY=
# - REDIS_URL=redis://redis:6379
# redis:
# image: redis:7-alpine
# ports: ["6379:6379"]
# nginx:
# image: nginx:alpine
# ports: ["80:80", "443:443"]
@dataclass
class ProdMetric:
metric: str
value: str
target: str
alert: str
metrics = [
ProdMetric("Agent Latency (p50)", "1.2s", "<2s", "> 3s"),
ProdMetric("Agent Latency (p99)", "8.5s", "<10s", "> 15s"),
ProdMetric("Cache Hit Rate", "42%", ">35%", "< 20%"),
ProdMetric("Success Rate", "98.5%", ">98%", "< 95%"),
ProdMetric("Throughput", "120 req/min", ">100", "< 50"),
ProdMetric("LLM Token Cost/day", "$45", "<$100", "> $150"),
ProdMetric("Queue Depth", "12", "<50", "> 100"),
ProdMetric("Worker Utilization", "65%", "<80%", "> 90%"),
]
print("Production Metrics:")
for m in metrics:
status = "OK" if m.value != m.alert else "ALERT"
print(f" [{status}] {m.metric}: {m.value} (Target: {m.target})")
cost_optimization = {
"Model Routing": "Simple→GPT-3.5 Complex→GPT-4o ลด 40-60%",
"Semantic Cache": "Cache similar queries ลด 30-50% API calls",
"Prompt Compression": "ลด Token ใน System Prompt ลด 10-20%",
"Batch Embedding": "รวม Embedding requests ลด overhead",
"Off-peak Processing": "Queue non-urgent tasks to off-peak",
"Token Monitoring": "Track token usage per user per feature",
}
print(f"\n\nCost Optimization:")
for k, v in cost_optimization.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Cache First: ใช้ Semantic Cache ก่อน ลด Cost ได้มาก
- Async: ใช้ Async ทุกที่ ไม่ Block ระหว่างรอ LLM
- Model Route: ใช้ Model เล็กสำหรับงานง่าย ประหยัด 40%+
- Queue: ใช้ Queue จัดการ Burst Traffic ป้องกัน Rate Limit
- Monitor: ดู Token Cost Latency Cache Hit Rate ทุกวัน
LangChain Agent คืออะไร
AI Component LLM ตัดสินใจเลือก Tool ReAct Tool Calling Plan-and-Execute API Database Customer Support Research Workflow Automation
Scale LangChain Agent อย่างไร
Async Concurrent Queue Celery Redis Cache Semantic Rate Limiting Load Balancing Streaming Batch Model Selection Worker Autoscale
จัดการ Rate Limit ของ LLM API อย่างไร
Token Bucket Retry Exponential Backoff Queue Buffer Fallback Model Cache Response Monitor Token Budget Alert Batch Similar
Optimize Agent Performance อย่างไร
Semantic Cache Prompt Optimization Smaller Model Tool Selection Parallel Execution Streaming Connection Pooling Async I/O ไม่ Block
สรุป
LangChain Agent Scaling Strategy Async Cache Rate Limiting Queue Model Routing Streaming Load Balancing Celery Redis Production Cost Optimization
