Rate Limiting คือการจำกัดจำนวน Request ที่ Client สามารถส่งไปยัง API ได้ในช่วงเวลาหนึ่ง เป็นกลไกสำคัญที่ทุก API ต้องมีเพื่อป้องกัน Abuse, DDoS, Resource exhaustion และรักษาคุณภาพบริการให้ผู้ใช้ทุกคน แต่เบื้องหลังคำว่า Rate Limiting นั้นมี Algorithm หลายตัวที่มีข้อดีข้อเสียต่างกัน การเลือก Algorithm ที่เหมาะสมมีผลต่อ Accuracy, Memory usage, และ User experience อย่างมาก
บทความนี้จะเจาะลึก 5 Algorithms หลัก พร้อม Implementation จริงด้วย Redis, Python และ TypeScript เพื่อให้คุณเข้าใจทั้งทฤษฎีและปฏิบัติ
1. Fixed Window Counter
หลักการ
แบ่งเวลาเป็น Window ขนาดคงที่ (เช่น 1 นาที) นับจำนวน Request ในแต่ละ Window ถ้าเกิน Limit ก็ปฏิเสธ เมื่อเข้า Window ใหม่ Counter จะ Reset เป็น 0
ข้อดี: เข้าใจง่าย, Implement ง่ายที่สุด, ใช้ Memory น้อย (แค่ Counter + Timestamp)
ข้อเสีย: มีปัญหา Boundary problem — ถ้า Request มาตอนท้าย Window เก่า + ต้น Window ใหม่ อาจได้ Request เป็น 2 เท่าของ Limit ในช่วง 2 วินาที
# Python + Redis Implementation
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def fixed_window_rate_limit(key: str, limit: int, window_seconds: int) -> bool:
"""
Returns True if request is allowed, False if rate limited.
"""
# สร้าง key ตาม window ปัจจุบัน
current_window = int(time.time() // window_seconds)
redis_key = f"ratelimit:{key}:{current_window}"
# Increment counter
current = r.incr(redis_key)
# ตั้ง TTL สำหรับ key ใหม่
if current == 1:
r.expire(redis_key, window_seconds)
# ตรวจสอบ limit
if current > limit:
return False # Rate limited
return True # Allowed
# ใช้งาน: 100 requests per minute
allowed = fixed_window_rate_limit("user:123", limit=100, window_seconds=60)
if not allowed:
print("429 Too Many Requests")
2. Sliding Window Log
หลักการ
เก็บ Timestamp ของทุก Request ไว้ใน Sorted Set เมื่อ Request ใหม่เข้ามา ลบ Timestamps ที่เก่ากว่า Window ออก แล้วนับจำนวนที่เหลือ ถ้าเกิน Limit ก็ปฏิเสธ
ข้อดี: แม่นยำมาก ไม่มี Boundary problem
ข้อเสีย: ใช้ Memory มาก (เก็บ Timestamp ทุก Request), O(n) สำหรับการนับ
# Python + Redis Sorted Set
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def sliding_window_log(key: str, limit: int, window_seconds: int) -> bool:
"""
Accurate sliding window using Redis Sorted Set.
"""
now = time.time()
window_start = now - window_seconds
redis_key = f"ratelimit:log:{key}"
# Atomic operation with pipeline
pipe = r.pipeline()
# ลบ entries ที่เก่ากว่า window
pipe.zremrangebyscore(redis_key, 0, window_start)
# นับ entries ปัจจุบัน
pipe.zcard(redis_key)
# เพิ่ม request ปัจจุบัน
pipe.zadd(redis_key, {str(now): now})
# ตั้ง TTL
pipe.expire(redis_key, window_seconds + 1)
results = pipe.execute()
current_count = results[1]
if current_count >= limit:
# ลบ entry ที่เพิ่งเพิ่ม (เพราะจะ reject)
r.zrem(redis_key, str(now))
return False
return True
3. Sliding Window Counter (Hybrid)
หลักการ
เป็นการผสม Fixed Window กับ Sliding Window โดยใช้ Counter ของ Window ปัจจุบัน + Counter ของ Window ก่อนหน้า แล้วคำนวณด้วย Weight ตามเวลาที่ผ่านไปใน Window ปัจจุบัน
สูตร: estimated_count = previous_window_count * (1 - elapsed_ratio) + current_window_count
ข้อดี: แม่นยำกว่า Fixed Window, ใช้ Memory น้อยกว่า Sliding Log, เป็นที่นิยมมากใน Production
ข้อเสีย: เป็นการประมาณ (approximation) ไม่แม่นยำ 100%
# Python + Redis Implementation
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def sliding_window_counter(key: str, limit: int, window_seconds: int) -> bool:
"""
Hybrid approach: weighted average of current and previous windows.
"""
now = time.time()
current_window = int(now // window_seconds)
previous_window = current_window - 1
# Time elapsed in current window (0.0 to 1.0)
elapsed = (now % window_seconds) / window_seconds
current_key = f"ratelimit:swc:{key}:{current_window}"
previous_key = f"ratelimit:swc:{key}:{previous_window}"
# Get counts
pipe = r.pipeline()
pipe.get(current_key)
pipe.get(previous_key)
results = pipe.execute()
current_count = int(results[0] or 0)
previous_count = int(results[1] or 0)
# Weighted estimate
estimated = previous_count * (1 - elapsed) + current_count
if estimated >= limit:
return False
# Increment current window
pipe = r.pipeline()
pipe.incr(current_key)
pipe.expire(current_key, window_seconds * 2)
pipe.execute()
return True
4. Token Bucket
หลักการ
จินตนาการว่ามี Bucket ที่เก็บ Tokens อยู่ Tokens จะถูกเติมเข้า Bucket ในอัตราคงที่ (เช่น 10 tokens/วินาที) แต่ละ Request ใช้ 1 Token ถ้า Bucket มี Token เหลือ → อนุญาต ถ้า Bucket ว่าง → ปฏิเสธ Bucket มีขนาดจำกัด (Max capacity) Tokens ที่เกิน Capacity จะถูกทิ้ง
ข้อดี: รองรับ Burst traffic ได้ (ถ้า Bucket เต็ม), ให้ผลลัพธ์ที่ Smooth, ใช้กันแพร่หลาย (AWS, Stripe, GitHub API)
ข้อเสีย: ซับซ้อนกว่า Fixed Window, ต้องเก็บ state 2 ค่า (tokens, last_refill)
# Python + Redis Lua Script (Atomic)
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
# Lua script สำหรับ Token Bucket (atomic operation)
TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2]) -- tokens per second
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local data = redis.call('hmget', key, 'tokens', 'last_refill')
local tokens = tonumber(data[1])
local last_refill = tonumber(data[2])
if tokens == nil then
-- Initialize bucket
tokens = capacity
last_refill = now
end
-- Refill tokens
local elapsed = now - last_refill
local new_tokens = elapsed * refill_rate
tokens = math.min(capacity, tokens + new_tokens)
last_refill = now
if tokens >= requested then
tokens = tokens - requested
redis.call('hmset', key, 'tokens', tokens, 'last_refill', last_refill)
redis.call('expire', key, math.ceil(capacity / refill_rate) * 2)
return 1 -- Allowed
else
redis.call('hmset', key, 'tokens', tokens, 'last_refill', last_refill)
redis.call('expire', key, math.ceil(capacity / refill_rate) * 2)
return 0 -- Denied
end
"""
# Register script
token_bucket = r.register_script(TOKEN_BUCKET_SCRIPT)
def token_bucket_rate_limit(
key: str,
capacity: int = 100,
refill_rate: float = 10.0, # tokens/second
tokens_needed: int = 1
) -> bool:
result = token_bucket(
keys=[f"ratelimit:tb:{key}"],
args=[capacity, refill_rate, time.time(), tokens_needed]
)
return bool(result)
# ใช้งาน: Bucket ขนาด 100, เติม 10 tokens/วินาที
allowed = token_bucket_rate_limit("api:user:123", capacity=100, refill_rate=10)
5. Leaky Bucket
หลักการ
จินตนาการว่ามี Bucket ที่มีรู้รั่วที่ก้น Request ที่เข้ามาจะถูกใส่ใน Bucket (Queue) Bucket จะปล่อย Request ออกในอัตราคงที่ (เช่น 10 requests/วินาที) ถ้า Bucket เต็ม (Queue เต็ม) → ปฏิเสธ Request ใหม่
ข้อดี: Output rate คงที่เสมอ (smooth), ป้องกัน Burst ได้ดี, เหมาะสำหรับ backend ที่ต้องการ predictable throughput
ข้อเสีย: ไม่รองรับ Burst (ต่างจาก Token Bucket), อาจทำให้ Response time สูงถ้า Queue ยาว
# Python Implementation (Queue-based)
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
LEAKY_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local leak_rate = tonumber(ARGV[2]) -- requests per second
local now = tonumber(ARGV[3])
local data = redis.call('hmget', key, 'water', 'last_leak')
local water = tonumber(data[1]) or 0
local last_leak = tonumber(data[2]) or now
-- Leak water
local elapsed = now - last_leak
local leaked = elapsed * leak_rate
water = math.max(0, water - leaked)
last_leak = now
if water < capacity then
water = water + 1
redis.call('hmset', key, 'water', water, 'last_leak', last_leak)
redis.call('expire', key, math.ceil(capacity / leak_rate) * 2)
return 1 -- Allowed
else
redis.call('hmset', key, 'water', water, 'last_leak', last_leak)
return 0 -- Denied (bucket full)
end
"""
leaky_bucket = r.register_script(LEAKY_BUCKET_SCRIPT)
def leaky_bucket_rate_limit(key: str, capacity: int = 60, leak_rate: float = 1.0) -> bool:
result = leaky_bucket(
keys=[f"ratelimit:lb:{key}"],
args=[capacity, leak_rate, time.time()]
)
return bool(result)
เปรียบเทียบ 5 Algorithms
| Algorithm | Accuracy | Memory | Burst Support | Complexity | Use Case |
|---|---|---|---|---|---|
| Fixed Window | ต่ำ (boundary issue) | O(1) | ไม่ | ง่ายมาก | Internal API ง่ายๆ |
| Sliding Window Log | สูงมาก | O(n) | ไม่ | ปานกลาง | ต้องการความแม่นยำสูง |
| Sliding Window Counter | สูง (ประมาณ) | O(1) | ไม่ | ปานกลาง | ใช้ทั่วไป (แนะนำ) |
| Token Bucket | สูง | O(1) | ได้ | ปานกลาง | API ที่ต้อง burst (แนะนำ) |
| Leaky Bucket | สูง | O(1) | ไม่ | ปานกลาง | ต้องการ smooth output |
Distributed Rate Limiting
ในระบบ Production ที่มีหลาย Server (Load balanced) Rate limiting ต้องทำแบบ Distributed ไม่ใช่ต่อ Server เพราะถ้าแต่ละ Server นับแยก User จะได้ Limit เท่ากับจำนวน Server คูณ Limit
# Redis Cluster สำหรับ Distributed Rate Limiting
# 1. ใช้ Redis เป็น Central counter (ทุก Server ชี้ไป Redis เดียวกัน)
# 2. ใช้ Redis Cluster สำหรับ High Availability
# 3. ใช้ Lua Script เพื่อ Atomic operations
# TypeScript Implementation (Node.js + ioredis)
import Redis from 'ioredis'
const redis = new Redis.Cluster([
{ host: 'redis-1', port: 6379 },
{ host: 'redis-2', port: 6379 },
{ host: 'redis-3', port: 6379 },
])
// Token Bucket ใน TypeScript
async function tokenBucket(
key: string,
capacity: number,
refillRate: number
): Promise<boolean> {
const script = `
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- ... (same Lua script as above)
`
const result = await redis.eval(
script, 1, key, capacity, refillRate, Date.now() / 1000
)
return result === 1
}
Rate Limit Headers
API ที่ดีควรส่ง Rate Limit Headers กลับไปให้ Client รู้สถานะ ตามมาตรฐาน IETF draft
# Standard Rate Limit Headers
# (IETF draft-ietf-httpapi-ratelimit-headers)
HTTP/1.1 200 OK
RateLimit-Limit: 100 # Maximum requests per window
RateLimit-Remaining: 42 # Remaining requests in current window
RateLimit-Reset: 1712847600 # Unix timestamp when window resets
# เมื่อถูก Rate Limited:
HTTP/1.1 429 Too Many Requests
RateLimit-Limit: 100
RateLimit-Remaining: 0
RateLimit-Reset: 1712847600
Retry-After: 30 # วินาทีที่ต้องรอ
Content-Type: application/json
{
"error": "rate_limit_exceeded",
"message": "Too many requests. Please retry after 30 seconds.",
"retry_after": 30
}
# Go Implementation (middleware)
# func RateLimitMiddleware(next http.Handler) http.Handler {
# return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
# key := r.RemoteAddr
# allowed, remaining, resetAt := rateLimiter.Check(key)
#
# w.Header().Set("RateLimit-Limit", "100")
# w.Header().Set("RateLimit-Remaining", strconv.Itoa(remaining))
# w.Header().Set("RateLimit-Reset", strconv.FormatInt(resetAt, 10))
#
# if !allowed {
# w.Header().Set("Retry-After", "30")
# w.WriteHeader(http.StatusTooManyRequests)
# return
# }
# next.ServeHTTP(w, r)
# })
# }
Adaptive Rate Limiting
Adaptive Rate Limiting ปรับ Limit อัตโนมัติตามสถานะของระบบ เช่น ถ้า Server load สูง → ลด Limit ลง ถ้า Server ว่าง → เพิ่ม Limit ได้
# Python: Adaptive Rate Limiting based on server load
import psutil
class AdaptiveRateLimiter:
def __init__(self, base_limit=100, min_limit=10, max_limit=500):
self.base_limit = base_limit
self.min_limit = min_limit
self.max_limit = max_limit
def get_current_limit(self) -> int:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory_percent = psutil.virtual_memory().percent
# ลด limit เมื่อ resource usage สูง
if cpu_percent > 90 or memory_percent > 90:
return self.min_limit
elif cpu_percent > 70 or memory_percent > 70:
return int(self.base_limit * 0.5)
elif cpu_percent > 50:
return int(self.base_limit * 0.75)
else:
return min(self.max_limit, self.base_limit)
def check(self, key: str) -> bool:
current_limit = self.get_current_limit()
return token_bucket_rate_limit(key, capacity=current_limit)
Rate Limiting Strategies
Per-User Rate Limiting
จำกัดตาม User ID หรือ API Key เหมาะสำหรับ authenticated API ผู้ใช้แต่ละคนมี Limit เท่ากัน (หรือต่างกันตาม Plan)
Per-IP Rate Limiting
จำกัดตาม IP Address เหมาะสำหรับ public API ที่ไม่ต้อง authentication ระวัง shared IP (NAT, VPN, corporate proxy) ที่อาจทำให้ผู้ใช้หลายคนถูก limit รวมกัน
Per-API-Key Rate Limiting
แต่ละ API Key มี Limit เฉพาะ เหมาะสำหรับ B2B API ที่ลูกค้าแต่ละรายมี quota ต่างกัน
Tiered Rate Limiting
# Tiered Limits ตาม Plan
RATE_LIMITS = {
"free": {"requests_per_minute": 30, "requests_per_day": 1000},
"basic": {"requests_per_minute": 100, "requests_per_day": 10000},
"pro": {"requests_per_minute": 500, "requests_per_day": 100000},
"enterprise": {"requests_per_minute": 5000, "requests_per_day": 1000000},
}
def check_tiered_limit(user_id: str, plan: str) -> bool:
limits = RATE_LIMITS[plan]
# Check per-minute limit
minute_ok = sliding_window_counter(
f"user:{user_id}:minute",
limits["requests_per_minute"],
60
)
# Check per-day limit
day_ok = sliding_window_counter(
f"user:{user_id}:day",
limits["requests_per_day"],
86400
)
return minute_ok and day_ok
Testing Rate Limiters
# Python: Load test rate limiter
import asyncio
import aiohttp
import time
async def test_rate_limiter(url: str, total_requests: int, concurrency: int):
"""Send requests and measure rate limiting behavior."""
results = {"allowed": 0, "limited": 0, "errors": 0}
start = time.time()
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(concurrency)
async def send_request(i):
async with semaphore:
try:
async with session.get(url) as resp:
if resp.status == 200:
results["allowed"] += 1
elif resp.status == 429:
results["limited"] += 1
else:
results["errors"] += 1
except Exception:
results["errors"] += 1
await asyncio.gather(*[send_request(i) for i in range(total_requests)])
elapsed = time.time() - start
print(f"Total: {total_requests} in {elapsed:.2f}s")
print(f"Allowed: {results['allowed']}")
print(f"Limited: {results['limited']}")
print(f"Errors: {results['errors']}")
print(f"Rate: {total_requests / elapsed:.1f} req/s")
# asyncio.run(test_rate_limiter("http://localhost:8080/api", 1000, 50))
สรุป
Rate Limiting เป็นกลไกที่ทุก API ต้องมี Algorithm แต่ละตัวมีข้อดีข้อเสียต่างกัน สำหรับ API ทั่วไป แนะนำ Sliding Window Counter เพราะสมดุลระหว่าง accuracy กับ memory สำหรับ API ที่ต้องรองรับ burst traffic แนะนำ Token Bucket (ใช้โดย AWS, GitHub, Stripe) และสำหรับระบบที่ต้องการ smooth output แนะนำ Leaky Bucket
สิ่งสำคัญที่ต้องจำคือ ใช้ Redis หรือ Central store สำหรับ Distributed rate limiting ส่ง Rate Limit Headers กลับไปให้ Client ทดสอบด้วย Load test เสมอ และพิจารณา Adaptive rate limiting สำหรับระบบที่ load ไม่คงที่ การเลือก Algorithm ที่ถูกต้องตั้งแต่แรกจะช่วยให้ API ของคุณรองรับ Scale ได้ดีในระยะยาว
