BlogBest Practices
Best Practices

API Rate Limiting: Strategies, Algorithms, and Production Implementation Guide

Rate limiting protects APIs from abuse, ensures fair usage, and maintains service availability. This guide covers fixed window, sliding window, token bucket, and leaky bucket algorithms, with production implementations in Redis, Nginx, and API Gateway, plus distributed rate limiting across clusters.

A

Alex Thompson

CEO & Cloud Architecture Expert at ZeonEdge with 15+ years building enterprise infrastructure.

March 24, 2026
21 min read

Why Rate Limiting Is Critical Infrastructure

Rate limiting is one of those features that seems optional until it isn't. Without it: a single runaway client can monopolize server resources; a credential-stuffing attack tries 50,000 passwords against your auth endpoint in an hour; a scrapers consume your entire database through your search API; a misconfigured client sends 10,000 webhook calls per second. All of these have happened to production systems without rate limiting.

The challenge is implementing rate limiting correctly. Naive implementations create race conditions. Distributed systems require coordination. The wrong algorithm causes legitimate users to hit limits while abusive ones slip through. This guide covers the theory and production implementation of rate limiting at every layer.

Rate Limiting Algorithms Compared

1. Fixed Window Counter

"""
Fixed Window: Count requests in fixed time windows
E.g., "100 requests per minute" — window resets every minute at :00

Problem: Boundary attack
  User sends 100 requests at 11:59:59
  Window resets at 12:00:00
  User sends 100 more requests at 12:00:01
  Result: 200 requests in 2 seconds, but no limit triggered
"""
import redis
import time

r = redis.Redis()

def check_rate_limit_fixed_window(
    key: str, 
    limit: int, 
    window_seconds: int
) -> tuple[bool, dict]:
    current_window = int(time.time() / window_seconds)
    redis_key = f"ratelimit:{key}:{current_window}"
    
    pipe = r.pipeline()
    pipe.incr(redis_key)
    pipe.expire(redis_key, window_seconds * 2)  # Keep for 2 windows
    count, _ = pipe.execute()
    
    remaining = max(0, limit - count)
    reset_at = (current_window + 1) * window_seconds
    
    return count <= limit, {
        "limit": limit,
        "remaining": remaining,
        "reset": reset_at,
    }

2. Sliding Window Log

"""
Sliding Window Log: Track exact timestamps of each request
Most accurate, but memory-intensive for high-volume APIs
"""
import redis
import time

r = redis.Redis()

def check_rate_limit_sliding_log(
    key: str,
    limit: int, 
    window_seconds: int
) -> tuple[bool, dict]:
    now = time.time()
    window_start = now - window_seconds
    redis_key = f"ratelimit:sliding:{key}"
    
    pipe = r.pipeline()
    # Remove old entries outside window
    pipe.zremrangebyscore(redis_key, 0, window_start)
    # Count current entries
    pipe.zcard(redis_key)
    # Add current request
    pipe.zadd(redis_key, {str(now): now})
    # Set expiry
    pipe.expire(redis_key, window_seconds)
    _, count, _, _ = pipe.execute()
    
    # count before adding current request
    allowed = count < limit
    
    return allowed, {
        "limit": limit,
        "remaining": max(0, limit - count - (1 if allowed else 0)),
        "reset": now + window_seconds,
    }

3. Sliding Window Counter (Best Balance)

"""
Sliding Window Counter: Approximate sliding window using two fixed windows
Memory efficient, more accurate than fixed window, no boundary attack
Formula: current_window_count + previous_window_count * (overlap_ratio)
"""
import redis
import time
import math

r = redis.Redis()

def check_rate_limit_sliding_counter(
    key: str,
    limit: int,
    window_seconds: int
) -> tuple[bool, dict]:
    now = time.time()
    current_window = math.floor(now / window_seconds)
    previous_window = current_window - 1
    
    # How far through current window we are (0.0 to 1.0)
    elapsed_in_window = (now % window_seconds) / window_seconds
    
    current_key = f"ratelimit:{key}:{current_window}"
    previous_key = f"ratelimit:{key}:{previous_window}"
    
    pipe = r.pipeline()
    pipe.get(previous_key)
    pipe.incr(current_key)
    pipe.expire(current_key, window_seconds * 2)
    
    prev_count_str, current_count, _ = pipe.execute()
    
    prev_count = int(prev_count_str or 0)
    
    # Approximate sliding window: weight previous window by remaining time
    estimated_count = prev_count * (1 - elapsed_in_window) + current_count
    
    if estimated_count > limit:
        # Roll back the increment
        r.decr(current_key)
        return False, {
            "limit": limit,
            "remaining": 0,
            "reset": math.ceil(now / window_seconds) * window_seconds,
            "retry_after": 1 / (limit / window_seconds) if limit > 0 else window_seconds,
        }
    
    return True, {
        "limit": limit,
        "remaining": max(0, int(limit - estimated_count)),
        "reset": math.ceil(now / window_seconds) * window_seconds,
    }

4. Token Bucket (Burst-Friendly)

"""
Token Bucket: Allows bursts up to bucket capacity
Tokens refill at a steady rate. Allows occasional bursts.
Best for: user-facing APIs where occasional bursts are OK
"""
import redis
import time
import math

r = redis.Redis()

def check_rate_limit_token_bucket(
    key: str,
    capacity: int,      # Max tokens (burst size)
    refill_rate: float, # Tokens per second
    cost: int = 1       # Tokens consumed per request
) -> tuple[bool, dict]:
    now = time.time()
    redis_key = f"ratelimit:tb:{key}"
    
    # Lua script for atomic token bucket operation
    lua_script = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local cost = tonumber(ARGV[3])
    local now = tonumber(ARGV[4])
    
    local data = redis.call('HMGET', key, 'tokens', 'last_refill')
    local tokens = tonumber(data[1]) or capacity
    local last_refill = tonumber(data[2]) or now
    
    -- Calculate new tokens based on elapsed time
    local elapsed = now - last_refill
    local new_tokens = math.min(capacity, tokens + elapsed * refill_rate)
    
    if new_tokens >= cost then
        new_tokens = new_tokens - cost
        redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
        redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
        return {1, math.floor(new_tokens)}  -- allowed, remaining tokens
    else
        redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
        redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
        return {0, math.floor(new_tokens)}  -- denied, remaining tokens
    end
    """
    
    script = r.register_script(lua_script)
    allowed, remaining = script(keys=[redis_key], args=[capacity, refill_rate, cost, now])
    
    return bool(allowed), {
        "limit": capacity,
        "remaining": remaining,
        "refill_rate": refill_rate,
    }

FastAPI Rate Limiting Middleware

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import time

app = FastAPI()

class RateLimitMiddleware:
    def __init__(self, app, redis_client, rules: list[dict]):
        """
        rules: [
            {"path": "/api/auth", "limit": 10, "window": 60},
            {"path": "/api/", "limit": 1000, "window": 60},
            {"path": "/", "limit": 5000, "window": 60},
        ]
        """
        self.app = app
        self.redis = redis_client
        self.rules = sorted(rules, key=lambda r: len(r["path"]), reverse=True)
    
    async def __call__(self, scope, receive, send):
        if scope["type"] == "http":
            request = Request(scope, receive)
            
            # Find matching rule (most specific path wins)
            rule = next(
                (r for r in self.rules if request.url.path.startswith(r["path"])),
                None
            )
            
            if rule:
                # Key: IP + path prefix (or user ID if authenticated)
                client_ip = request.client.host
                user_id = request.headers.get("X-User-ID", "")
                rate_key = f"{user_id or client_ip}:{rule['path']}"
                
                allowed, info = check_rate_limit_sliding_counter(
                    rate_key, rule["limit"], rule["window"]
                )
                
                if not allowed:
                    response = JSONResponse(
                        status_code=429,
                        content={"error": "Rate limit exceeded", "retry_after": info.get("retry_after", 1)},
                        headers={
                            "X-RateLimit-Limit": str(info["limit"]),
                            "X-RateLimit-Remaining": "0",
                            "X-RateLimit-Reset": str(int(info["reset"])),
                            "Retry-After": str(int(info.get("retry_after", 1))),
                        }
                    )
                    await response(scope, receive, send)
                    return
                
                # Inject rate limit headers for allowed requests
                # (done via response middleware in production)
        
        await self.app(scope, receive, send)

app.add_middleware(
    RateLimitMiddleware,
    redis_client=r,
    rules=[
        {"path": "/api/auth/login", "limit": 5, "window": 300},   # 5 per 5 mins
        {"path": "/api/auth/", "limit": 20, "window": 60},         # 20 per minute
        {"path": "/api/upload", "limit": 10, "window": 3600},      # 10 per hour
        {"path": "/api/", "limit": 500, "window": 60},             # 500 per minute
    ]
)

Nginx Rate Limiting

# Nginx built-in rate limiting (efficient, no Redis needed for basic use)

http {
    # Define rate limit zones
    # $binary_remote_addr: more efficient than $remote_addr
    
    # Login endpoint: 5 requests per minute per IP
    limit_req_zone $binary_remote_addr zone=login:10m rate=5r/m;
    
    # API: 100 requests per second per IP
    limit_req_zone $binary_remote_addr zone=api:10m rate=100r/s;
    
    # Static assets: 1000 requests per second per IP
    limit_req_zone $binary_remote_addr zone=static:10m rate=1000r/s;
    
    # Per user-ID (header-based — use with auth)
    limit_req_zone $http_x_user_id zone=user_api:10m rate=200r/s;
    
    # Global rate limiting (protect backend from ALL clients combined)
    limit_req_zone $server_name zone=global:10m rate=5000r/s;
    
    server {
        location /api/auth/login {
            limit_req zone=login burst=3 nodelay;
            limit_req_status 429;
            
            # Custom error response
            error_page 429 @rate_limited;
            
            proxy_pass http://backend;
        }
        
        location /api/ {
            limit_req zone=api burst=50 nodelay;
            limit_req zone=global burst=500;
            limit_req_status 429;
            
            # Rate limit headers
            add_header X-RateLimit-Limit 100;
            add_header X-RateLimit-Remaining $limit_req_status;
            
            proxy_pass http://backend;
        }
        
        location @rate_limited {
            default_type application/json;
            return 429 '{"error":"Rate limit exceeded","retry_after":60}';
        }
    }
}

Distributed Rate Limiting Across Multiple Servers

"""
Challenge: With multiple API servers, each server has independent counters.
Server A: user has made 80/100 requests
Server B: user has made 80/100 requests
Total: 160 requests — limit bypassed!

Solution: Centralized Redis cluster with atomic Lua scripts
"""
from redis.cluster import RedisCluster
import hashlib

# Redis Cluster for HA distributed rate limiting
redis_cluster = RedisCluster(
    startup_nodes=[
        {"host": "redis-node-1", "port": 6379},
        {"host": "redis-node-2", "port": 6379},
        {"host": "redis-node-3", "port": 6379},
    ],
    decode_responses=True,
)

# Hash the key to ensure all rate limit keys for a user
# route to the same Redis shard (using hash tags)
def make_rate_limit_key(user_id: str, endpoint: str, window: int) -> str:
    current_window = int(time.time() / window)
    # Hash tag ensures this key routes to the same cluster slot
    return f"{{rl:{user_id}}}:{endpoint}:{current_window}"

# Redis Cluster handles coordination — single atomic operation
# All operations on keys with same hash tag go to same shard

# For multi-region: use Redis Global (Cloudflare Workers KV, or AWS Global Datastore)
# Trade-off: slight staleness (~1s) for global rate limits is usually acceptable

def check_rate_limit_distributed(
    user_id: str,
    endpoint: str, 
    limit: int,
    window_seconds: int
) -> bool:
    key = make_rate_limit_key(user_id, endpoint, window_seconds)
    
    pipe = redis_cluster.pipeline()
    pipe.incr(key)
    pipe.expire(key, window_seconds * 2)
    count, _ = pipe.execute()
    
    return count <= limit

AWS API Gateway Rate Limiting

# Terraform: API Gateway with usage plans and rate limiting

resource "aws_api_gateway_usage_plan" "basic" {
  name = "basic-plan"

  api_stages {
    api_id = aws_api_gateway_rest_api.main.id
    stage  = aws_api_gateway_stage.prod.stage_name
  }

  quota_settings {
    limit  = 10000    # 10,000 requests per month
    period = "MONTH"
  }

  throttle_settings {
    burst_limit = 50   # Allow burst of 50 concurrent requests
    rate_limit  = 100  # 100 requests per second steady state
  }
}

resource "aws_api_gateway_usage_plan" "premium" {
  name = "premium-plan"
  
  api_stages {
    api_id = aws_api_gateway_rest_api.main.id
    stage  = aws_api_gateway_stage.prod.stage_name
    
    # Per-method throttling (override plan defaults)
    throttle {
      path        = "/api/export/POST"
      burst_limit = 5    # Expensive export: much lower limit
      rate_limit  = 2
    }
  }

  quota_settings {
    limit  = 1000000  # 1M requests per month
    period = "MONTH"
  }

  throttle_settings {
    burst_limit = 500
    rate_limit  = 1000
  }
}

# Create API key for a client
resource "aws_api_gateway_api_key" "client_1" {
  name = "client-1-api-key"
}

# Associate key with usage plan
resource "aws_api_gateway_usage_plan_key" "client_1" {
  key_id        = aws_api_gateway_api_key.client_1.id
  key_type      = "API_KEY"
  usage_plan_id = aws_api_gateway_usage_plan.premium.id
}

Rate Limit Response Best Practices

from fastapi.responses import JSONResponse
from datetime import datetime, timezone

def rate_limit_response(
    limit: int, 
    remaining: int, 
    reset_timestamp: float,
    retry_after_seconds: int
) -> JSONResponse:
    """
    Standard rate limit response following RFC 6585 and IETF draft-ietf-httpapi-ratelimit-headers
    """
    reset_dt = datetime.fromtimestamp(reset_timestamp, tz=timezone.utc)
    
    return JSONResponse(
        status_code=429,
        content={
            "error": {
                "code": "RATE_LIMIT_EXCEEDED",
                "message": "Too many requests. Please slow down.",
                "documentation": "https://docs.company.com/api/rate-limiting",
                "retry_after": retry_after_seconds,
            }
        },
        headers={
            # RFC 6585: 429 Too Many Requests
            "Retry-After": str(retry_after_seconds),
            
            # IETF draft-ietf-httpapi-ratelimit-headers (widely adopted)
            "RateLimit-Limit": str(limit),
            "RateLimit-Remaining": str(remaining),
            "RateLimit-Reset": reset_dt.strftime("%a, %d %b %Y %H:%M:%S GMT"),
            
            # Common additions
            "X-RateLimit-Limit": str(limit),
            "X-RateLimit-Remaining": str(remaining), 
            "X-RateLimit-Reset": str(int(reset_timestamp)),
            
            # CORS headers (if needed)
            "Access-Control-Expose-Headers": 
                "RateLimit-Limit, RateLimit-Remaining, RateLimit-Reset, Retry-After",
        }
    )

# Add rate limit headers to ALL responses (not just 429)
@app.middleware("http")
async def add_rate_limit_headers(request: Request, call_next):
    response = await call_next(request)
    
    # Add to successful responses too (so clients know their status)
    if hasattr(request.state, "rate_limit_info"):
        info = request.state.rate_limit_info
        response.headers["X-RateLimit-Limit"] = str(info["limit"])
        response.headers["X-RateLimit-Remaining"] = str(info["remaining"])
        response.headers["X-RateLimit-Reset"] = str(int(info["reset"]))
    
    return response

Rate Limiting Anti-Patterns

Anti-Pattern 1: Rate Limiting by IP Alone

# BAD: IP-based rate limiting behind a load balancer
# All requests appear to come from the load balancer IP!
# Also: shared IPs (NAT, corporate proxy, mobile) punish legitimate users

# GOOD: Use user ID (authenticated) first, IP as fallback
def get_rate_limit_key(request: Request) -> str:
    # Prefer authenticated user ID
    if user_id := request.state.user_id:
        return f"user:{user_id}"
    
    # Fall back to IP (from X-Forwarded-For if behind proxy)
    forwarded_for = request.headers.get("X-Forwarded-For")
    if forwarded_for:
        # Take first IP (client's real IP)
        client_ip = forwarded_for.split(",")[0].strip()
    else:
        client_ip = request.client.host
    
    return f"ip:{client_ip}"

Anti-Pattern 2: Not Returning Retry-After Headers

Without Retry-After headers, clients in a retry loop hammer your API continuously after hitting a rate limit. A good 429 response tells clients exactly when to retry — most HTTP clients honor Retry-After automatically.

Anti-Pattern 3: Same Limits for All Endpoints

Different endpoints have different costs and risk profiles. Your health check endpoint can handle 10,000 requests/second without strain. Your ML inference endpoint can handle maybe 10. Your login endpoint should have aggressive limits to prevent brute force. Design limits per endpoint.

Monitoring Rate Limiting

import prometheus_client as prom

rate_limit_hits = prom.Counter(
    "rate_limit_hits_total",
    "Total rate limit hits",
    ["endpoint", "client_type"]
)

rate_limit_remaining = prom.Histogram(
    "rate_limit_remaining_tokens",
    "Remaining rate limit tokens at request time",
    ["endpoint"],
    buckets=[0, 10, 25, 50, 75, 90, 100]
)

# Alert: sustained rate limit hits may indicate attack
# alert: rate_limit_hits_total > 1000 in 5m for single client_type=ip

Conclusion

Rate limiting is critical infrastructure that protects your API's availability and fairness. The sliding window counter algorithm offers the best balance of accuracy, memory efficiency, and performance for most use cases. Token bucket is better when you want to allow short bursts from legitimate clients.

Implement rate limiting at multiple layers: Nginx at the edge for basic IP-based protection, middleware for per-user limits, and API Gateway for customer-facing quota management. Always return proper headers, always test your implementation under load, and monitor rate limit hits as a leading indicator of attacks or misbehaving clients.

A

Alex Thompson

CEO & Cloud Architecture Expert at ZeonEdge with 15+ years building enterprise infrastructure.

Ready to Transform Your Infrastructure?

Let's discuss how we can help you achieve similar results.