SiamCafe · Blog
LangChain Agent Load Testing Strategy — ทดสอบ
บทความ

LangChain Agent Load Testing Strategy — ทดสอบ

เผยแพร่ 28 พฤษภาคม 2569

LangChain Load Testing

LangChain Agent Load Testing Locust Token Throughput Latency Rate Limiting Scaling Concurrent Users Cost Optimization Production

MetricTargetMeasurementImpact
TTFT< 500msTime to first tokenUser experience
Total Latency< 10s (simple) < 30s (complex)End-to-end response timeUser satisfaction
Token Throughput> 50 tokens/s per userOutput tokens per secondStreaming speed
Concurrent Users> 50 simultaneousUsers before degradationCapacity planning
Error Rate< 1%Failed requests percentageReliability
Cost per Request< $0.05 averageLLM tokens × priceBudget planning

Locust Load Test

# === Locust Load Test for LangChain Agent ===

# pip install locust langchain openai

# locustfile.py
# from locust import HttpUser, task, between
# import json
# import time
#
# class AgentUser(HttpUser):
#     wait_time = between(2, 5)
#     host = "http://localhost:8000"
#
#     simple_queries = [
#         "What is Python?",
#         "Explain Docker containers",
#         "How does Kubernetes work?",
#     ]
#
#     complex_queries = [
#         "Compare AWS Lambda vs ECS for microservices, include pricing",
#         "Design a CI/CD pipeline for a Python monorepo with 10 services",
#         "Analyze the trade-offs of using PostgreSQL vs MongoDB for an e-commerce app",
#     ]
#
#     @task(3)
#     def simple_query(self):
#         query = random.choice(self.simple_queries)
#         start = time.time()
#         with self.client.post("/chat",
#             json={"query": query, "stream": False},
#             catch_response=True) as response:
#             if response.status_code == 200:
#                 data = response.json()
#                 latency = time.time() - start
#                 tokens = data.get("usage", {}).get("total_tokens", 0)
#                 response.success()
#             else:
#                 response.failure(f"Status {response.status_code}")
#
#     @task(1)
#     def complex_query(self):
#         query = random.choice(self.complex_queries)
#         with self.client.post("/chat",
#             json={"query": query, "stream": False},
#             timeout=60,
#             catch_response=True) as response:
#             if response.status_code == 200:
#                 response.success()
#             else:
#                 response.failure(f"Status {response.status_code}")

# Run: locust -f locustfile.py --users 50 --spawn-rate 5

from dataclasses import dataclass

@dataclass
class LoadProfile:
    phase: str
    users: int
    ramp_rate: str
    duration: str
    purpose: str

profiles = [
    LoadProfile("Smoke Test", 5, "1 user/s", "5 min", "Verify basic functionality"),
    LoadProfile("Load Test", 50, "5 users/s", "30 min", "Normal load capacity"),
    LoadProfile("Stress Test", 200, "10 users/s", "15 min", "Find breaking point"),
    LoadProfile("Spike Test", 500, "50 users/s", "5 min", "Sudden traffic surge"),
    LoadProfile("Soak Test", 30, "3 users/s", "4 hours", "Memory leaks, stability"),
]

print("=== Load Test Profiles ===")
for p in profiles:
    print(f"  [{p.phase}] Users: {p.users} | Ramp: {p.ramp_rate}")
    print(f"    Duration: {p.duration} | Purpose: {p.purpose}")

Token and Cost Analysis

# === Token Usage and Cost Calculator ===

@dataclass
class TokenMetric:
    query_type: str
    avg_input_tokens: int
    avg_output_tokens: int
    avg_tool_calls: int
    avg_latency_s: float
    cost_per_request: float

metrics = [
    TokenMetric("Simple Q&A", 150, 300, 0, 2.5, 0.002),
    TokenMetric("RAG Query", 800, 500, 1, 5.0, 0.008),
    TokenMetric("Agent (1 tool)", 500, 400, 1, 8.0, 0.006),
    TokenMetric("Agent (3 tools)", 1200, 800, 3, 15.0, 0.015),
    TokenMetric("Agent (complex)", 2000, 1500, 5, 25.0, 0.030),
    TokenMetric("Code Generation", 300, 1000, 0, 6.0, 0.008),
]

print("=== Token Usage per Query Type ===")
total_cost_1k = 0
for m in metrics:
    total_tokens = m.avg_input_tokens + m.avg_output_tokens
    tps = m.avg_output_tokens / m.avg_latency_s
    print(f"  [{m.query_type}]")
    print(f"    Tokens: {m.avg_input_tokens} in + {m.avg_output_tokens} out = {total_tokens}")
    print(f"    Tools: {m.avg_tool_calls} | Latency: {m.avg_latency_s}s | TPS: {tps:.0f}")
    print(f"    Cost: /request")

# Monthly cost projection
daily_requests = 10000
avg_cost = 0.01
monthly = daily_requests * avg_cost * 30
print(f"\n  Monthly Cost Projection:")
print(f"    {daily_requests:,} requests/day × /req × 30 days = /month")

# Cost optimization strategies
optimizations = {
    "Cache frequent queries": "Save 30-50% on repeated questions",
    "Use GPT-3.5 for simple routing": "Save 90% on router decisions",
    "Limit max_tokens": "Prevent runaway token usage",
    "Batch similar requests": "Reduce overhead per request",
    "Use smaller model for tools": "Save 50-70% on tool selection",
}

print(f"\n  Cost Optimization:")
for k, v in optimizations.items():
    print(f"    [{k}]: {v}")

Production Scaling

# === Scaling Architecture ===

# FastAPI Server with LangChain
# from fastapi import FastAPI
# from langchain.agents import AgentExecutor
# import asyncio
#
# app = FastAPI()
# semaphore = asyncio.Semaphore(10)  # Max concurrent LLM calls
#
# @app.post("/chat")
# async def chat(request: ChatRequest):
#     async with semaphore:
#         result = await agent.ainvoke({"input": request.query})
#         return {"response": result["output"],
#                 "usage": result.get("usage", {})}

# Kubernetes HPA
# apiVersion: autoscaling/v2
# kind: HorizontalPodAutoscaler
# metadata:
#   name: agent-api-hpa
# spec:
#   scaleTargetRef:
#     kind: Deployment
#     name: agent-api
#   minReplicas: 3
#   maxReplicas: 20
#   metrics:
#     - type: Pods
#       pods:
#         metric:
#           name: concurrent_requests
#         target:
#           type: AverageValue
#           averageValue: "8"

@dataclass
class ScaleConfig:
    component: str
    min_replicas: int
    max_replicas: int
    scale_metric: str
    scale_target: str

scaling = [
    ScaleConfig("Agent API", 3, 20, "concurrent_requests", "8 per pod"),
    ScaleConfig("Redis Cache", 3, 3, "fixed", "Sentinel HA"),
    ScaleConfig("Vector DB", 2, 6, "query_latency_p99", "< 100ms"),
    ScaleConfig("Tool Services", 2, 10, "cpu_utilization", "70%"),
    ScaleConfig("Load Balancer", 2, 2, "fixed", "Active-passive"),
]

print("Scaling Configuration:")
for s in scaling:
    print(f"  [{s.component}] Replicas: {s.min_replicas}-{s.max_replicas}")
    print(f"    Metric: {s.scale_metric} | Target: {s.scale_target}")

# Performance checklist
checklist = {
    "Streaming enabled": "Reduce TTFT, improve UX",
    "Async LLM calls": "Don't block event loop",
    "Semaphore for concurrency": "Prevent LLM API rate limit",
    "Redis cache for repeated queries": "Save tokens and latency",
    "Timeout on all external calls": "Prevent hanging requests",
    "Circuit breaker on LLM API": "Graceful degradation",
    "Prometheus metrics": "Track latency, tokens, errors",
}

print(f"\n\nPerformance Checklist:")
for k, v in checklist.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Streaming: เปิด Streaming ลด TTFT ให้ User เห็นผลเร็วขึ้น
  • Cache: Cache คำถามที่ถามบ่อย ลด Token Cost 30-50%
  • Semaphore: จำกัด Concurrent LLM Calls ป้องกัน Rate Limit
  • Soak Test: รัน Soak Test 4+ ชั่วโมง หา Memory Leak
  • Cost Alert: ตั้ง Budget Alert ป้องกัน Token Cost เกินงบ

ทำไมต้อง Load Test LangChain Agent

Latency สูง LLM หลายรอบ Tool Call Concurrent Users Token Throughput Cost Bottleneck Rate Limiting Memory Leak