SiamCafe · Blog
LangChain Agent Scaling Strategy วิธี Scale —
บทความ

LangChain Agent Scaling Strategy วิธี Scale —

เผยแพร่ 28 พฤษภาคม 2569

LangChain Agent Scaling

LangChain Agent Scaling Strategy วิธี Scale —

LangChain Agent Scaling Strategy Async Processing Rate Limiting Caching Queue Load Balancing Streaming Batch Model Selection Production Deployment

StrategyImpactComplexityCost ReductionLatency Impact
Semantic Cacheลด API Call 30-50%ปานกลางสูง-80% (cache hit)
Async Processingเพิ่ม Throughput 5-10xปานกลางไม่มีลด Wait Time
Queue Systemจัดการ Burst Trafficปานกลางไม่มีเพิ่มเล็กน้อย
Model Routingลด Cost 40-60%สูงสูงมากลด (smaller model)
Streamingลด Perceived Latencyง่ายไม่มี-70% TTFT
Rate Limitingป้องกัน Overloadง่ายป้องกัน Overspendไม่มี

Agent Architecture

=== LangChain Agent with Scaling ===

pip install langchain langchain-openai redis celery

from langchain.agents import create_tool_calling_agent, AgentExecutor

from langchain_openai import ChatOpenAI

from langchain.tools import tool

from langchain.cache import RedisSemanticCache

from langchain_openai import OpenAIEmbeddings

import langchain

import asyncio

import redis

# Semantic Cache Setup

langchain.llm_cache = RedisSemanticCache(

redis_url="redis://localhost:6379",

embedding=OpenAIEmbeddings(),

score_threshold=0.95, # Cache hit threshold

)

# Model Router — Choose model by complexity

def get_model(complexity: str) -> ChatOpenAI:

models = {

"simple": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),

"medium": ChatOpenAI(model="gpt-4o-mini", temperature=0),

"complex": ChatOpenAI(model="gpt-4o", temperature=0),

}

return models.get(complexity, models["medium"])

# Tools

@tool

def search_database(query: str) -> str:

"""Search the product database"""

# DB query logic

return f"Results for: {query}"

@tool

def calculate(expression: str) -> str:

"""Calculate mathematical expression"""

return str(eval(expression))

# Async Agent Executor

async def run_agent_async(query: str, complexity: str = "medium"):

llm = get_model(complexity)

agent = create_tool_calling_agent(llm, [search_database, calculate], prompt)

executor = AgentExecutor(agent=agent, tools=[search_database, calculate])

result = await executor.ainvoke({"input": query})

return result

# Batch Processing

async def process_batch(queries: list[str]):

tasks = [run_agent_async(q) for q in queries]

results = await asyncio.gather(*tasks, return_exceptions=True)

return results

from dataclasses import dataclass

@dataclass

class ScalingLayer:

layer: str

technology: str

purpose: str

throughput: str

config: str

layers = [

ScalingLayer("Load Balancer", "Nginx / Traefik", "Distribute requests", "10K+ rps", "Round-robin / least-conn"),

ScalingLayer("API Server", "FastAPI + uvicorn", "Handle HTTP requests", "1K+ rps per worker", "4-8 workers per CPU"),

ScalingLayer("Queue", "Redis + Celery", "Buffer async tasks", "50K+ msg/s", "Multiple queues by priority"),

ScalingLayer("Cache", "Redis Semantic Cache", "Cache LLM responses", "100K+ ops/s", "TTL 1h, threshold 0.95"),

ScalingLayer("LLM Gateway", "LiteLLM / custom", "Route to models", "Based on model", "Fallback chain"),

ScalingLayer("Agent Workers", "Celery workers", "Execute agent logic", "10-50 concurrent", "Autoscale on queue"),

]

print("=== Scaling Architecture ===")

for l in layers:

print(f" [{l.layer}] {l.technology}")

print(f" Purpose: {l.purpose} | Throughput: {l.throughput}")

print(f" Config: {l.config}")

Rate Limiting and Caching

LangChain Agent Scaling Strategy วิธี Scale —

=== Rate Limiting & Caching ===

Token Bucket Rate Limiter

import time

from collections import defaultdict

class TokenBucketLimiter:

def __init__(self, rate: float, capacity: int):

self.rate = rate # tokens per second

self.capacity = capacity # max tokens

self.buckets = defaultdict(lambda: {"tokens": capacity, "last": time.time()})

def allow(self, key: str, tokens: int = 1) -> bool:

bucket = self.buckets[key]

now = time.time()

elapsed = now - bucket["last"]

bucket["tokens"] = min(self.capacity, bucket["tokens"] + elapsed * self.rate)

bucket["last"] = now

if bucket["tokens"] >= tokens:

bucket["tokens"] -= tokens

return True

return False

limiter = TokenBucketLimiter(rate=10, capacity=60) # 10 req/s, burst 60

Retry with Exponential Backoff

import openai

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(

stop=stop_after_attempt(5),

wait=wait_exponential(multiplier=1, min=2, max=60),

retry=retry_if_exception_type(openai.RateLimitError),

)

async def call_llm_with_retry(prompt: str):

return await llm.ainvoke(prompt)

Semantic Cache with Redis

from langchain.cache import RedisSemanticCache

cache = RedisSemanticCache(

redis_url="redis://localhost:6379",

embedding=OpenAIEmbeddings(model="text-embedding-3-small"),

score_threshold=0.92,

ttl=3600, # 1 hour

)

@dataclass

class CacheStrategy:

strategy: str

hit_rate: str

latency_saved: str

cost_saved: str

complexity: str

use_case: str

strategies = [

CacheStrategy("Exact Match", "10-20%", "2-5s per hit", "10-20%", "ง่าย", "Repeated exact queries"),

CacheStrategy("Semantic Cache", "30-50%", "2-5s per hit", "30-50%", "ปานกลาง", "Similar questions"),

CacheStrategy("Tool Result Cache", "40-60%", "0.5-2s per hit", "20-30%", "ง่าย", "DB/API results"),

CacheStrategy("Conversation Cache", "20-30%", "1-3s per hit", "15-25%", "ปานกลาง", "Ongoing sessions"),

CacheStrategy("Embedding Cache", "80-90%", "0.1s per hit", "40-60%", "ง่าย", "Repeated embeddings"),

]

print("\n=== Cache Strategies ===")

for s in strategies:

print(f" [{s.strategy}] Hit Rate: {s.hit_rate} | Saved: {s.cost_saved}")

print(f" Latency: {s.latency_saved} | Complexity: {s.complexity}")

print(f" Use Case: {s.use_case}")

Production Deployment

# === Production Agent Deployment ===

# Docker Compose — Full Stack
# version: '3.8'
# services:
#   api:
#     build: .
#     ports: ["8000:8000"]
#     environment:
#       - OPENAI_API_KEY=
#       - REDIS_URL=redis://redis:6379
#     deploy:
#       replicas: 4
#   worker:
#     build: .
#     command: celery -A tasks worker -c 8 --autoscale=16,4
#     environment:
#       - OPENAI_API_KEY=
#       - REDIS_URL=redis://redis:6379
#   redis:
#     image: redis:7-alpine
#     ports: ["6379:6379"]
#   nginx:
#     image: nginx:alpine
#     ports: ["80:80", "443:443"]

@dataclass
class ProdMetric:
    metric: str
    value: str
    target: str
    alert: str

metrics = [
    ProdMetric("Agent Latency (p50)", "1.2s", "<2s", "> 3s"),
    ProdMetric("Agent Latency (p99)", "8.5s", "<10s", "> 15s"),
    ProdMetric("Cache Hit Rate", "42%", ">35%", "< 20%"),
    ProdMetric("Success Rate", "98.5%", ">98%", "< 95%"),
    ProdMetric("Throughput", "120 req/min", ">100", "< 50"),
    ProdMetric("LLM Token Cost/day", "$45", "<$100", "> $150"),
    ProdMetric("Queue Depth", "12", "<50", "> 100"),
    ProdMetric("Worker Utilization", "65%", "<80%", "> 90%"),
]

print("Production Metrics:")
for m in metrics:
    status = "OK" if m.value != m.alert else "ALERT"
    print(f"  [{status}] {m.metric}: {m.value} (Target: {m.target})")

cost_optimization = {
    "Model Routing": "Simple→GPT-3.5 Complex→GPT-4o ลด 40-60%",
    "Semantic Cache": "Cache similar queries ลด 30-50% API calls",
    "Prompt Compression": "ลด Token ใน System Prompt ลด 10-20%",
    "Batch Embedding": "รวม Embedding requests ลด overhead",
    "Off-peak Processing": "Queue non-urgent tasks to off-peak",
    "Token Monitoring": "Track token usage per user per feature",
}

print(f"\n\nCost Optimization:")
for k, v in cost_optimization.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Cache First: ใช้ Semantic Cache ก่อน ลด Cost ได้มาก
  • Async: ใช้ Async ทุกที่ ไม่ Block ระหว่างรอ LLM
  • Model Route: ใช้ Model เล็กสำหรับงานง่าย ประหยัด 40%+
  • Queue: ใช้ Queue จัดการ Burst Traffic ป้องกัน Rate Limit
  • Monitor: ดู Token Cost Latency Cache Hit Rate ทุกวัน

LangChain Agent คืออะไร

AI Component LLM ตัดสินใจเลือก Tool ReAct Tool Calling Plan-and-Execute API Database Customer Support Research Workflow Automation