LangChain Load Testing
LangChain Agent Load Testing Locust Token Throughput Latency Rate Limiting Scaling Concurrent Users Cost Optimization Production
| Metric | Target | Measurement | Impact |
|---|---|---|---|
| TTFT | < 500ms | Time to first token | User experience |
| Total Latency | < 10s (simple) < 30s (complex) | End-to-end response time | User satisfaction |
| Token Throughput | > 50 tokens/s per user | Output tokens per second | Streaming speed |
| Concurrent Users | > 50 simultaneous | Users before degradation | Capacity planning |
| Error Rate | < 1% | Failed requests percentage | Reliability |
| Cost per Request | < $0.05 average | LLM tokens × price | Budget planning |
Locust Load Test
# === Locust Load Test for LangChain Agent ===
# pip install locust langchain openai
# locustfile.py
# from locust import HttpUser, task, between
# import json
# import time
#
# class AgentUser(HttpUser):
# wait_time = between(2, 5)
# host = "http://localhost:8000"
#
# simple_queries = [
# "What is Python?",
# "Explain Docker containers",
# "How does Kubernetes work?",
# ]
#
# complex_queries = [
# "Compare AWS Lambda vs ECS for microservices, include pricing",
# "Design a CI/CD pipeline for a Python monorepo with 10 services",
# "Analyze the trade-offs of using PostgreSQL vs MongoDB for an e-commerce app",
# ]
#
# @task(3)
# def simple_query(self):
# query = random.choice(self.simple_queries)
# start = time.time()
# with self.client.post("/chat",
# json={"query": query, "stream": False},
# catch_response=True) as response:
# if response.status_code == 200:
# data = response.json()
# latency = time.time() - start
# tokens = data.get("usage", {}).get("total_tokens", 0)
# response.success()
# else:
# response.failure(f"Status {response.status_code}")
#
# @task(1)
# def complex_query(self):
# query = random.choice(self.complex_queries)
# with self.client.post("/chat",
# json={"query": query, "stream": False},
# timeout=60,
# catch_response=True) as response:
# if response.status_code == 200:
# response.success()
# else:
# response.failure(f"Status {response.status_code}")
# Run: locust -f locustfile.py --users 50 --spawn-rate 5
from dataclasses import dataclass
@dataclass
class LoadProfile:
phase: str
users: int
ramp_rate: str
duration: str
purpose: str
profiles = [
LoadProfile("Smoke Test", 5, "1 user/s", "5 min", "Verify basic functionality"),
LoadProfile("Load Test", 50, "5 users/s", "30 min", "Normal load capacity"),
LoadProfile("Stress Test", 200, "10 users/s", "15 min", "Find breaking point"),
LoadProfile("Spike Test", 500, "50 users/s", "5 min", "Sudden traffic surge"),
LoadProfile("Soak Test", 30, "3 users/s", "4 hours", "Memory leaks, stability"),
]
print("=== Load Test Profiles ===")
for p in profiles:
print(f" [{p.phase}] Users: {p.users} | Ramp: {p.ramp_rate}")
print(f" Duration: {p.duration} | Purpose: {p.purpose}")
Token and Cost Analysis
# === Token Usage and Cost Calculator ===
@dataclass
class TokenMetric:
query_type: str
avg_input_tokens: int
avg_output_tokens: int
avg_tool_calls: int
avg_latency_s: float
cost_per_request: float
metrics = [
TokenMetric("Simple Q&A", 150, 300, 0, 2.5, 0.002),
TokenMetric("RAG Query", 800, 500, 1, 5.0, 0.008),
TokenMetric("Agent (1 tool)", 500, 400, 1, 8.0, 0.006),
TokenMetric("Agent (3 tools)", 1200, 800, 3, 15.0, 0.015),
TokenMetric("Agent (complex)", 2000, 1500, 5, 25.0, 0.030),
TokenMetric("Code Generation", 300, 1000, 0, 6.0, 0.008),
]
print("=== Token Usage per Query Type ===")
total_cost_1k = 0
for m in metrics:
total_tokens = m.avg_input_tokens + m.avg_output_tokens
tps = m.avg_output_tokens / m.avg_latency_s
print(f" [{m.query_type}]")
print(f" Tokens: {m.avg_input_tokens} in + {m.avg_output_tokens} out = {total_tokens}")
print(f" Tools: {m.avg_tool_calls} | Latency: {m.avg_latency_s}s | TPS: {tps:.0f}")
print(f" Cost: /request")
# Monthly cost projection
daily_requests = 10000
avg_cost = 0.01
monthly = daily_requests * avg_cost * 30
print(f"\n Monthly Cost Projection:")
print(f" {daily_requests:,} requests/day × /req × 30 days = /month")
# Cost optimization strategies
optimizations = {
"Cache frequent queries": "Save 30-50% on repeated questions",
"Use GPT-3.5 for simple routing": "Save 90% on router decisions",
"Limit max_tokens": "Prevent runaway token usage",
"Batch similar requests": "Reduce overhead per request",
"Use smaller model for tools": "Save 50-70% on tool selection",
}
print(f"\n Cost Optimization:")
for k, v in optimizations.items():
print(f" [{k}]: {v}")
Production Scaling
# === Scaling Architecture ===
# FastAPI Server with LangChain
# from fastapi import FastAPI
# from langchain.agents import AgentExecutor
# import asyncio
#
# app = FastAPI()
# semaphore = asyncio.Semaphore(10) # Max concurrent LLM calls
#
# @app.post("/chat")
# async def chat(request: ChatRequest):
# async with semaphore:
# result = await agent.ainvoke({"input": request.query})
# return {"response": result["output"],
# "usage": result.get("usage", {})}
# Kubernetes HPA
# apiVersion: autoscaling/v2
# kind: HorizontalPodAutoscaler
# metadata:
# name: agent-api-hpa
# spec:
# scaleTargetRef:
# kind: Deployment
# name: agent-api
# minReplicas: 3
# maxReplicas: 20
# metrics:
# - type: Pods
# pods:
# metric:
# name: concurrent_requests
# target:
# type: AverageValue
# averageValue: "8"
@dataclass
class ScaleConfig:
component: str
min_replicas: int
max_replicas: int
scale_metric: str
scale_target: str
scaling = [
ScaleConfig("Agent API", 3, 20, "concurrent_requests", "8 per pod"),
ScaleConfig("Redis Cache", 3, 3, "fixed", "Sentinel HA"),
ScaleConfig("Vector DB", 2, 6, "query_latency_p99", "< 100ms"),
ScaleConfig("Tool Services", 2, 10, "cpu_utilization", "70%"),
ScaleConfig("Load Balancer", 2, 2, "fixed", "Active-passive"),
]
print("Scaling Configuration:")
for s in scaling:
print(f" [{s.component}] Replicas: {s.min_replicas}-{s.max_replicas}")
print(f" Metric: {s.scale_metric} | Target: {s.scale_target}")
# Performance checklist
checklist = {
"Streaming enabled": "Reduce TTFT, improve UX",
"Async LLM calls": "Don't block event loop",
"Semaphore for concurrency": "Prevent LLM API rate limit",
"Redis cache for repeated queries": "Save tokens and latency",
"Timeout on all external calls": "Prevent hanging requests",
"Circuit breaker on LLM API": "Graceful degradation",
"Prometheus metrics": "Track latency, tokens, errors",
}
print(f"\n\nPerformance Checklist:")
for k, v in checklist.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Streaming: เปิด Streaming ลด TTFT ให้ User เห็นผลเร็วขึ้น
- Cache: Cache คำถามที่ถามบ่อย ลด Token Cost 30-50%
- Semaphore: จำกัด Concurrent LLM Calls ป้องกัน Rate Limit
- Soak Test: รัน Soak Test 4+ ชั่วโมง หา Memory Leak
- Cost Alert: ตั้ง Budget Alert ป้องกัน Token Cost เกินงบ
ทำไมต้อง Load Test LangChain Agent
Latency สูง LLM หลายรอบ Tool Call Concurrent Users Token Throughput Cost Bottleneck Rate Limiting Memory Leak
ใช้เครื่องมืออะไร Load Test
Locust Python k6 JavaScript Artillery YAML Custom Script LangSmith Trace Prometheus Grafana Real-time Metrics
วัด Metrics อะไรบ้าง
TTFT Total Latency Token Throughput Tokens per Request Tool Calls Error Rate Cost per Request Concurrent Users
Optimize Performance อย่างไร
Streaming TTFT Caching Smaller Model Router Async Parallel Timeout Connection Pool Rate Limiter Batch Scale Horizontal
สรุป
LangChain Agent Load Testing Locust Token Throughput Latency Cost Scaling Streaming Cache Semaphore HPA Prometheus Production Operations
