Home > Blog > tech

Rate Limiting Algorithms เจาะลึก Token Bucket, Sliding Window, Leaky Bucket พร้อมโค้ด 2026

api rate limiting algorithms deep dive
Rate Limiting Algorithms Deep Dive 2026
2026-04-11 | tech | 3600 words

Rate Limiting คือการจำกัดจำนวน Request ที่ Client สามารถส่งไปยัง API ได้ในช่วงเวลาหนึ่ง เป็นกลไกสำคัญที่ทุก API ต้องมีเพื่อป้องกัน Abuse, DDoS, Resource exhaustion และรักษาคุณภาพบริการให้ผู้ใช้ทุกคน แต่เบื้องหลังคำว่า Rate Limiting นั้นมี Algorithm หลายตัวที่มีข้อดีข้อเสียต่างกัน การเลือก Algorithm ที่เหมาะสมมีผลต่อ Accuracy, Memory usage, และ User experience อย่างมาก

บทความนี้จะเจาะลึก 5 Algorithms หลัก พร้อม Implementation จริงด้วย Redis, Python และ TypeScript เพื่อให้คุณเข้าใจทั้งทฤษฎีและปฏิบัติ

1. Fixed Window Counter

หลักการ

แบ่งเวลาเป็น Window ขนาดคงที่ (เช่น 1 นาที) นับจำนวน Request ในแต่ละ Window ถ้าเกิน Limit ก็ปฏิเสธ เมื่อเข้า Window ใหม่ Counter จะ Reset เป็น 0

ข้อดี: เข้าใจง่าย, Implement ง่ายที่สุด, ใช้ Memory น้อย (แค่ Counter + Timestamp)

ข้อเสีย: มีปัญหา Boundary problem — ถ้า Request มาตอนท้าย Window เก่า + ต้น Window ใหม่ อาจได้ Request เป็น 2 เท่าของ Limit ในช่วง 2 วินาที

# Python + Redis Implementation
import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def fixed_window_rate_limit(key: str, limit: int, window_seconds: int) -> bool:
    """
    Returns True if request is allowed, False if rate limited.
    """
    # สร้าง key ตาม window ปัจจุบัน
    current_window = int(time.time() // window_seconds)
    redis_key = f"ratelimit:{key}:{current_window}"

    # Increment counter
    current = r.incr(redis_key)

    # ตั้ง TTL สำหรับ key ใหม่
    if current == 1:
        r.expire(redis_key, window_seconds)

    # ตรวจสอบ limit
    if current > limit:
        return False  # Rate limited

    return True  # Allowed

# ใช้งาน: 100 requests per minute
allowed = fixed_window_rate_limit("user:123", limit=100, window_seconds=60)
if not allowed:
    print("429 Too Many Requests")

2. Sliding Window Log

หลักการ

เก็บ Timestamp ของทุก Request ไว้ใน Sorted Set เมื่อ Request ใหม่เข้ามา ลบ Timestamps ที่เก่ากว่า Window ออก แล้วนับจำนวนที่เหลือ ถ้าเกิน Limit ก็ปฏิเสธ

ข้อดี: แม่นยำมาก ไม่มี Boundary problem

ข้อเสีย: ใช้ Memory มาก (เก็บ Timestamp ทุก Request), O(n) สำหรับการนับ

# Python + Redis Sorted Set
import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def sliding_window_log(key: str, limit: int, window_seconds: int) -> bool:
    """
    Accurate sliding window using Redis Sorted Set.
    """
    now = time.time()
    window_start = now - window_seconds
    redis_key = f"ratelimit:log:{key}"

    # Atomic operation with pipeline
    pipe = r.pipeline()

    # ลบ entries ที่เก่ากว่า window
    pipe.zremrangebyscore(redis_key, 0, window_start)

    # นับ entries ปัจจุบัน
    pipe.zcard(redis_key)

    # เพิ่ม request ปัจจุบัน
    pipe.zadd(redis_key, {str(now): now})

    # ตั้ง TTL
    pipe.expire(redis_key, window_seconds + 1)

    results = pipe.execute()
    current_count = results[1]

    if current_count >= limit:
        # ลบ entry ที่เพิ่งเพิ่ม (เพราะจะ reject)
        r.zrem(redis_key, str(now))
        return False

    return True

3. Sliding Window Counter (Hybrid)

หลักการ

เป็นการผสม Fixed Window กับ Sliding Window โดยใช้ Counter ของ Window ปัจจุบัน + Counter ของ Window ก่อนหน้า แล้วคำนวณด้วย Weight ตามเวลาที่ผ่านไปใน Window ปัจจุบัน

สูตร: estimated_count = previous_window_count * (1 - elapsed_ratio) + current_window_count

ข้อดี: แม่นยำกว่า Fixed Window, ใช้ Memory น้อยกว่า Sliding Log, เป็นที่นิยมมากใน Production

ข้อเสีย: เป็นการประมาณ (approximation) ไม่แม่นยำ 100%

# Python + Redis Implementation
import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def sliding_window_counter(key: str, limit: int, window_seconds: int) -> bool:
    """
    Hybrid approach: weighted average of current and previous windows.
    """
    now = time.time()
    current_window = int(now // window_seconds)
    previous_window = current_window - 1

    # Time elapsed in current window (0.0 to 1.0)
    elapsed = (now % window_seconds) / window_seconds

    current_key = f"ratelimit:swc:{key}:{current_window}"
    previous_key = f"ratelimit:swc:{key}:{previous_window}"

    # Get counts
    pipe = r.pipeline()
    pipe.get(current_key)
    pipe.get(previous_key)
    results = pipe.execute()

    current_count = int(results[0] or 0)
    previous_count = int(results[1] or 0)

    # Weighted estimate
    estimated = previous_count * (1 - elapsed) + current_count

    if estimated >= limit:
        return False

    # Increment current window
    pipe = r.pipeline()
    pipe.incr(current_key)
    pipe.expire(current_key, window_seconds * 2)
    pipe.execute()

    return True

4. Token Bucket

หลักการ

จินตนาการว่ามี Bucket ที่เก็บ Tokens อยู่ Tokens จะถูกเติมเข้า Bucket ในอัตราคงที่ (เช่น 10 tokens/วินาที) แต่ละ Request ใช้ 1 Token ถ้า Bucket มี Token เหลือ → อนุญาต ถ้า Bucket ว่าง → ปฏิเสธ Bucket มีขนาดจำกัด (Max capacity) Tokens ที่เกิน Capacity จะถูกทิ้ง

ข้อดี: รองรับ Burst traffic ได้ (ถ้า Bucket เต็ม), ให้ผลลัพธ์ที่ Smooth, ใช้กันแพร่หลาย (AWS, Stripe, GitHub API)

ข้อเสีย: ซับซ้อนกว่า Fixed Window, ต้องเก็บ state 2 ค่า (tokens, last_refill)

# Python + Redis Lua Script (Atomic)
import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

# Lua script สำหรับ Token Bucket (atomic operation)
TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])  -- tokens per second
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local data = redis.call('hmget', key, 'tokens', 'last_refill')
local tokens = tonumber(data[1])
local last_refill = tonumber(data[2])

if tokens == nil then
    -- Initialize bucket
    tokens = capacity
    last_refill = now
end

-- Refill tokens
local elapsed = now - last_refill
local new_tokens = elapsed * refill_rate
tokens = math.min(capacity, tokens + new_tokens)
last_refill = now

if tokens >= requested then
    tokens = tokens - requested
    redis.call('hmset', key, 'tokens', tokens, 'last_refill', last_refill)
    redis.call('expire', key, math.ceil(capacity / refill_rate) * 2)
    return 1  -- Allowed
else
    redis.call('hmset', key, 'tokens', tokens, 'last_refill', last_refill)
    redis.call('expire', key, math.ceil(capacity / refill_rate) * 2)
    return 0  -- Denied
end
"""

# Register script
token_bucket = r.register_script(TOKEN_BUCKET_SCRIPT)

def token_bucket_rate_limit(
    key: str,
    capacity: int = 100,
    refill_rate: float = 10.0,  # tokens/second
    tokens_needed: int = 1
) -> bool:
    result = token_bucket(
        keys=[f"ratelimit:tb:{key}"],
        args=[capacity, refill_rate, time.time(), tokens_needed]
    )
    return bool(result)

# ใช้งาน: Bucket ขนาด 100, เติม 10 tokens/วินาที
allowed = token_bucket_rate_limit("api:user:123", capacity=100, refill_rate=10)

5. Leaky Bucket

หลักการ

จินตนาการว่ามี Bucket ที่มีรู้รั่วที่ก้น Request ที่เข้ามาจะถูกใส่ใน Bucket (Queue) Bucket จะปล่อย Request ออกในอัตราคงที่ (เช่น 10 requests/วินาที) ถ้า Bucket เต็ม (Queue เต็ม) → ปฏิเสธ Request ใหม่

ข้อดี: Output rate คงที่เสมอ (smooth), ป้องกัน Burst ได้ดี, เหมาะสำหรับ backend ที่ต้องการ predictable throughput

ข้อเสีย: ไม่รองรับ Burst (ต่างจาก Token Bucket), อาจทำให้ Response time สูงถ้า Queue ยาว

# Python Implementation (Queue-based)
import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

LEAKY_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local leak_rate = tonumber(ARGV[2])  -- requests per second
local now = tonumber(ARGV[3])

local data = redis.call('hmget', key, 'water', 'last_leak')
local water = tonumber(data[1]) or 0
local last_leak = tonumber(data[2]) or now

-- Leak water
local elapsed = now - last_leak
local leaked = elapsed * leak_rate
water = math.max(0, water - leaked)
last_leak = now

if water < capacity then
    water = water + 1
    redis.call('hmset', key, 'water', water, 'last_leak', last_leak)
    redis.call('expire', key, math.ceil(capacity / leak_rate) * 2)
    return 1  -- Allowed
else
    redis.call('hmset', key, 'water', water, 'last_leak', last_leak)
    return 0  -- Denied (bucket full)
end
"""

leaky_bucket = r.register_script(LEAKY_BUCKET_SCRIPT)

def leaky_bucket_rate_limit(key: str, capacity: int = 60, leak_rate: float = 1.0) -> bool:
    result = leaky_bucket(
        keys=[f"ratelimit:lb:{key}"],
        args=[capacity, leak_rate, time.time()]
    )
    return bool(result)

เปรียบเทียบ 5 Algorithms

AlgorithmAccuracyMemoryBurst SupportComplexityUse Case
Fixed Windowต่ำ (boundary issue)O(1)ไม่ง่ายมากInternal API ง่ายๆ
Sliding Window LogสูงมากO(n)ไม่ปานกลางต้องการความแม่นยำสูง
Sliding Window Counterสูง (ประมาณ)O(1)ไม่ปานกลางใช้ทั่วไป (แนะนำ)
Token BucketสูงO(1)ได้ปานกลางAPI ที่ต้อง burst (แนะนำ)
Leaky BucketสูงO(1)ไม่ปานกลางต้องการ smooth output

Distributed Rate Limiting

ในระบบ Production ที่มีหลาย Server (Load balanced) Rate limiting ต้องทำแบบ Distributed ไม่ใช่ต่อ Server เพราะถ้าแต่ละ Server นับแยก User จะได้ Limit เท่ากับจำนวน Server คูณ Limit

# Redis Cluster สำหรับ Distributed Rate Limiting

# 1. ใช้ Redis เป็น Central counter (ทุก Server ชี้ไป Redis เดียวกัน)
# 2. ใช้ Redis Cluster สำหรับ High Availability
# 3. ใช้ Lua Script เพื่อ Atomic operations

# TypeScript Implementation (Node.js + ioredis)
import Redis from 'ioredis'

const redis = new Redis.Cluster([
  { host: 'redis-1', port: 6379 },
  { host: 'redis-2', port: 6379 },
  { host: 'redis-3', port: 6379 },
])

// Token Bucket ใน TypeScript
async function tokenBucket(
  key: string,
  capacity: number,
  refillRate: number
): Promise<boolean> {
  const script = `
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])
    -- ... (same Lua script as above)
  `
  const result = await redis.eval(
    script, 1, key, capacity, refillRate, Date.now() / 1000
  )
  return result === 1
}

Rate Limit Headers

API ที่ดีควรส่ง Rate Limit Headers กลับไปให้ Client รู้สถานะ ตามมาตรฐาน IETF draft

# Standard Rate Limit Headers
# (IETF draft-ietf-httpapi-ratelimit-headers)

HTTP/1.1 200 OK
RateLimit-Limit: 100           # Maximum requests per window
RateLimit-Remaining: 42        # Remaining requests in current window
RateLimit-Reset: 1712847600    # Unix timestamp when window resets

# เมื่อถูก Rate Limited:
HTTP/1.1 429 Too Many Requests
RateLimit-Limit: 100
RateLimit-Remaining: 0
RateLimit-Reset: 1712847600
Retry-After: 30                # วินาทีที่ต้องรอ
Content-Type: application/json

{
  "error": "rate_limit_exceeded",
  "message": "Too many requests. Please retry after 30 seconds.",
  "retry_after": 30
}

# Go Implementation (middleware)
# func RateLimitMiddleware(next http.Handler) http.Handler {
#     return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
#         key := r.RemoteAddr
#         allowed, remaining, resetAt := rateLimiter.Check(key)
#
#         w.Header().Set("RateLimit-Limit", "100")
#         w.Header().Set("RateLimit-Remaining", strconv.Itoa(remaining))
#         w.Header().Set("RateLimit-Reset", strconv.FormatInt(resetAt, 10))
#
#         if !allowed {
#             w.Header().Set("Retry-After", "30")
#             w.WriteHeader(http.StatusTooManyRequests)
#             return
#         }
#         next.ServeHTTP(w, r)
#     })
# }

Adaptive Rate Limiting

Adaptive Rate Limiting ปรับ Limit อัตโนมัติตามสถานะของระบบ เช่น ถ้า Server load สูง → ลด Limit ลง ถ้า Server ว่าง → เพิ่ม Limit ได้

# Python: Adaptive Rate Limiting based on server load
import psutil

class AdaptiveRateLimiter:
    def __init__(self, base_limit=100, min_limit=10, max_limit=500):
        self.base_limit = base_limit
        self.min_limit = min_limit
        self.max_limit = max_limit

    def get_current_limit(self) -> int:
        cpu_percent = psutil.cpu_percent(interval=0.1)
        memory_percent = psutil.virtual_memory().percent

        # ลด limit เมื่อ resource usage สูง
        if cpu_percent > 90 or memory_percent > 90:
            return self.min_limit
        elif cpu_percent > 70 or memory_percent > 70:
            return int(self.base_limit * 0.5)
        elif cpu_percent > 50:
            return int(self.base_limit * 0.75)
        else:
            return min(self.max_limit, self.base_limit)

    def check(self, key: str) -> bool:
        current_limit = self.get_current_limit()
        return token_bucket_rate_limit(key, capacity=current_limit)

Rate Limiting Strategies

Per-User Rate Limiting

จำกัดตาม User ID หรือ API Key เหมาะสำหรับ authenticated API ผู้ใช้แต่ละคนมี Limit เท่ากัน (หรือต่างกันตาม Plan)

Per-IP Rate Limiting

จำกัดตาม IP Address เหมาะสำหรับ public API ที่ไม่ต้อง authentication ระวัง shared IP (NAT, VPN, corporate proxy) ที่อาจทำให้ผู้ใช้หลายคนถูก limit รวมกัน

Per-API-Key Rate Limiting

แต่ละ API Key มี Limit เฉพาะ เหมาะสำหรับ B2B API ที่ลูกค้าแต่ละรายมี quota ต่างกัน

Tiered Rate Limiting

# Tiered Limits ตาม Plan
RATE_LIMITS = {
    "free":       {"requests_per_minute": 30,   "requests_per_day": 1000},
    "basic":      {"requests_per_minute": 100,  "requests_per_day": 10000},
    "pro":        {"requests_per_minute": 500,  "requests_per_day": 100000},
    "enterprise": {"requests_per_minute": 5000, "requests_per_day": 1000000},
}

def check_tiered_limit(user_id: str, plan: str) -> bool:
    limits = RATE_LIMITS[plan]

    # Check per-minute limit
    minute_ok = sliding_window_counter(
        f"user:{user_id}:minute",
        limits["requests_per_minute"],
        60
    )

    # Check per-day limit
    day_ok = sliding_window_counter(
        f"user:{user_id}:day",
        limits["requests_per_day"],
        86400
    )

    return minute_ok and day_ok

Testing Rate Limiters

# Python: Load test rate limiter
import asyncio
import aiohttp
import time

async def test_rate_limiter(url: str, total_requests: int, concurrency: int):
    """Send requests and measure rate limiting behavior."""
    results = {"allowed": 0, "limited": 0, "errors": 0}
    start = time.time()

    async with aiohttp.ClientSession() as session:
        semaphore = asyncio.Semaphore(concurrency)

        async def send_request(i):
            async with semaphore:
                try:
                    async with session.get(url) as resp:
                        if resp.status == 200:
                            results["allowed"] += 1
                        elif resp.status == 429:
                            results["limited"] += 1
                        else:
                            results["errors"] += 1
                except Exception:
                    results["errors"] += 1

        await asyncio.gather(*[send_request(i) for i in range(total_requests)])

    elapsed = time.time() - start
    print(f"Total: {total_requests} in {elapsed:.2f}s")
    print(f"Allowed: {results['allowed']}")
    print(f"Limited: {results['limited']}")
    print(f"Errors:  {results['errors']}")
    print(f"Rate:    {total_requests / elapsed:.1f} req/s")

# asyncio.run(test_rate_limiter("http://localhost:8080/api", 1000, 50))

สรุป

Rate Limiting เป็นกลไกที่ทุก API ต้องมี Algorithm แต่ละตัวมีข้อดีข้อเสียต่างกัน สำหรับ API ทั่วไป แนะนำ Sliding Window Counter เพราะสมดุลระหว่าง accuracy กับ memory สำหรับ API ที่ต้องรองรับ burst traffic แนะนำ Token Bucket (ใช้โดย AWS, GitHub, Stripe) และสำหรับระบบที่ต้องการ smooth output แนะนำ Leaky Bucket

สิ่งสำคัญที่ต้องจำคือ ใช้ Redis หรือ Central store สำหรับ Distributed rate limiting ส่ง Rate Limit Headers กลับไปให้ Client ทดสอบด้วย Load test เสมอ และพิจารณา Adaptive rate limiting สำหรับระบบที่ load ไม่คงที่ การเลือก Algorithm ที่ถูกต้องตั้งแต่แรกจะช่วยให้ API ของคุณรองรับ Scale ได้ดีในระยะยาว


Back to Blog | iCafe Forex | SiamLanCard | Siam2R