🔧 High-performance Python rate limiting library with multiple algorithms (Fixed Window, Sliding Window, Token Bucket, Leaky Bucket & GCRA) and storage backends (Redis, In-Memory).
简体中文 | English
🔰 Installation | 🎨 Quick Start | 📝 Usage | ⚙️ Data Models | 📊 Benchmarks | 🍃 Inspiration | 📚 Version History | 📄 License
- Supports both synchronous and asynchronous (
async / await
). - Provides thread-safe storage backends: Redis, In-Memory (with support for key expiration and eviction).
- Supports multiple rate limiting algorithms: Fixed Window, Sliding Window, Token Bucket, Leaky Bucket & Generic Cell Rate Algorithm (GCRA).
- Supports configuration of rate limiting algorithms and provides flexible quota configuration.
- Supports immediate response and wait-retry modes, and provides function call, decorator, and context manager modes.
- Supports integration with the MCP Python SDK to provide rate limiting support for model dialog processes.
- Excellent performance, The execution time for a single rate limiting API call is equivalent to(see Benchmarks for details):
- In-Memory: ~2.5-4.5x
dict[key] += 1
operations. - Redis: ~1.06-1.37x
INCRBY key increment
operations.
- In-Memory: ~2.5-4.5x
$ pip install throttled-py
Starting from v2.0.0, only core dependencies are installed by default.
To enable additional features, install optional dependencies as follows (multiple extras can be comma-separated):
$ pip install "throttled-py[redis]"
$ pip install "throttled-py[redis,in-memory]"
Extra | Description |
---|---|
all |
Install all extras. |
in-momory |
Use In-Memory as storage backend. |
redis |
Use Redis as storage backend. |
limit
: Deduct requests and return RateLimitResult.peek
: Check current rate limit state for a key (returns RateLimitState).
from throttled import RateLimiterType, Throttled, rate_limiter, store, utils
throttle = Throttled(
# 📈 Use Token Bucket algorithm
using=RateLimiterType.TOKEN_BUCKET.value,
# 🪣 Set quota: 1,000 tokens per second (limit), bucket size 1,000 (burst)
quota=rate_limiter.per_sec(1_000, burst=1_000),
# 📁 Use In-Memory storage
store=store.MemoryStore(),
)
def call_api() -> bool:
# 💧 Deduct 1 token for key="/ping"
result = throttle.limit("/ping", cost=1)
return result.limited
if __name__ == "__main__":
# 💻 Python 3.12.10, Linux 5.4.119-1-tlinux4-0009.1, Arch: x86_64, Specs: 2C4G.
# ✅ Total: 100000, 🕒 Latency: 0.0068 ms/op, 🚀 Throughput: 122513 req/s (--)
# ❌ Denied: 98000 requests
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(benchmark.serial(call_api, 100_000))
print(f"❌ Denied: {denied_num} requests")
The core API is the same for synchronous and asynchronous code. Just replace from throttled import ...
with from throttled.asyncio import ...
in your code.
For example, rewrite 2) Example
to asynchronous:
import asyncio
from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, store, utils
throttle = Throttled(
using=RateLimiterType.TOKEN_BUCKET.value,
quota=rate_limiter.per_sec(1_000, burst=1_000),
store=store.MemoryStore(),
)
async def call_api() -> bool:
result = await throttle.limit("/ping", cost=1)
return result.limited
async def main():
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(await benchmark.async_serial(call_api, 100_000))
print(f"❌ Denied: {denied_num} requests")
if __name__ == "__main__":
asyncio.run(main())
from throttled import Throttled
# Default: In-Memory storage, Token Bucket algorithm, 60 reqs / min.
throttle = Throttled()
# Deduct 1 request -> RateLimitResult(limited=False,
# state=RateLimitState(limit=60, remaining=59, reset_after=1, retry_after=0))
print(throttle.limit("key", 1))
# Check state -> RateLimitState(limit=60, remaining=59, reset_after=1, retry_after=0)
print(throttle.peek("key"))
# Deduct 60 requests (limited) -> RateLimitResult(limited=True,
# state=RateLimitState(limit=60, remaining=59, reset_after=1, retry_after=60))
print(throttle.limit("key", 60))
from throttled import Throttled, rate_limiter, exceptions
@Throttled(key="/ping", quota=rate_limiter.per_min(1))
def ping() -> str:
return "ping"
ping()
try:
ping() # Raises LimitedError
except exceptions.LimitedError as exc:
print(exc) # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60
You can use the context manager to limit the code block. When access is allowed, return RateLimitResult.
If the limit is exceeded or the retry timeout is exceeded, it will raise LimitedError.
from throttled import Throttled, exceptions, rate_limiter
def call_api():
print("doing something...")
throttle: Throttled = Throttled(key="/api/v1/users/", quota=rate_limiter.per_min(1))
with throttle as rate_limit_result:
print(f"limited: {rate_limit_result.limited}")
call_api()
try:
with throttle:
call_api()
except exceptions.LimitedError as exc:
print(exc) # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60
By default, rate limiting returns RateLimitResult immediately.
You can specify a timeout
to enable wait-and-retry behavior. The rate limiter will wait according to the retry_after
value in RateLimitState and retry automatically.
Returns the final RateLimitResult when the request is allowed or timeout reached.
from throttled import RateLimiterType, Throttled, rate_limiter, utils
throttle = Throttled(
using=RateLimiterType.GCRA.value,
quota=rate_limiter.per_sec(100, burst=100),
# ⏳ Set timeout=1 to enable wait-and-retry (max wait 1 second)
timeout=1,
)
def call_api() -> bool:
# ⬆️⏳ Function-level timeout overrides global timeout
result = throttle.limit("/ping", cost=1, timeout=1)
return result.limited
if __name__ == "__main__":
# 👇 The actual QPS is close to the preset quota (100 req/s):
# ✅ Total: 1000, 🕒 Latency: 35.8103 ms/op, 🚀 Throughput: 111 req/s (--)
# ❌ Denied: 8 requests
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(benchmark.concurrent(call_api, 1_000, workers=4))
print(f"❌ Denied: {denied_num} requests")
The following example uses Redis as the storage backend, options
supports all Redis configuration items, see RedisStore Options.
from throttled import RateLimiterType, Throttled, rate_limiter, store
@Throttled(
key="/api/products",
using=RateLimiterType.TOKEN_BUCKET.value,
quota=rate_limiter.per_min(1),
store=store.RedisStore(server="redis://127.0.0.1:6379/0", options={"PASSWORD": ""}),
)
def products() -> list:
return [{"name": "iPhone"}, {"name": "MacBook"}]
products() # Success
products() # Raises LimitedError
If you want to throttle the same Key at different locations in your program, make sure that Throttled receives the same MemoryStore and uses a consistent Quota
.
The following example uses memory as the storage backend and throttles the same Key on ping and pong:
from throttled import Throttled, rate_limiter, store
mem_store = store.MemoryStore()
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def ping() -> str: return "ping"
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def pong() -> str: return "pong"
ping() # Success
pong() # Raises LimitedError
The rate limiting algorithm is specified by the using
parameter. The supported algorithms are as follows:
- Fixed window:
RateLimiterType.FIXED_WINDOW.value
- Sliding window:
RateLimiterType.SLIDING_WINDOW.value
- Token Bucket:
RateLimiterType.TOKEN_BUCKET.value
- Leaky Bucket:
RateLimiterType.LEAKING_BUCKET.value
- Generic Cell Rate Algorithm, GCRA:
RateLimiterType.GCRA.value
from throttled import RateLimiterType, Throttled, rate_limiter, store
throttle = Throttled(
# 🌟Specifying a current limiting algorithm
using=RateLimiterType.FIXED_WINDOW.value,
quota=rate_limiter.per_min(1),
store=store.MemoryStore()
)
assert throttle.limit("key", 2).limited is True
from throttled import rate_limiter
rate_limiter.per_sec(60) # 60 req/sec
rate_limiter.per_min(60) # 60 req/min
rate_limiter.per_hour(60) # 60 req/hour
rate_limiter.per_day(60) # 60 req/day
rate_limiter.per_week(60) # 60 req/week
The burst
parameter can be used to adjust the ability of the throttling object to handle burst traffic. This is valid for the following algorithms:
TOKEN_BUCKET
LEAKING_BUCKET
GCRA
from throttled import rate_limiter
# Allow 120 burst requests.
# When burst is not specified, the default setting is the limit passed in.
rate_limiter.per_min(60, burst=120)
from datetime import timedelta
from throttled import rate_limiter
# A total of 120 requests are allowed in two minutes, and a burst of 150 requests is allowed.
rate_limiter.per_duration(timedelta(minutes=2), limit=120, burst=150)
RateLimitState represents the result after executing the RateLimiter for the given key.
Field | Type | Description |
---|---|---|
limited |
bool | Limited represents whether this request is allowed to pass. |
state |
RateLimitState | RateLimitState represents the result after executing the RateLimiter for the given key. |
RateLimitState represents the current state of the rate limiter for the given key.
Field | Type | Description |
---|---|---|
limit |
int | Limit represents the maximum number of requests allowed to pass in the initial state. |
remaining |
int | Remaining represents the maximum number of requests allowed to pass for the given key in the current state. |
reset_after |
float | ResetAfter represents the time in seconds for the RateLimiter to return to its initial state. In the initial state, Limit=Remaining. |
retry_after |
float | RetryAfter represents the time in seconds for the request to be retried, 0 if the request is allowed. |
Quota represents the quota limit configuration.
Field | Type | Description |
---|---|---|
burst |
int | Optional burst capacity that allows exceeding the rate limit momentarily(supports Token / Leaky Bucket, GCRA). |
rate |
Rate | The base rate limit configuration. |
Rate represents the rate limit configuration.
Field | Type | Description |
---|---|---|
period |
datetime.timedelta | The time period for which the rate limit applies. |
limit |
int | The maximum number of requests allowed within the specified period. |
Param | Description | Default |
---|---|---|
server |
Redis connection URL | "redis://localhost:6379/0" |
options |
Storage-specific configurations | {} |
RedisStore is developed based on the Redis API provided by redis-py.
In terms of Redis connection configuration management, the configuration naming of django-redis is basically used to reduce the learning cost.
Parameter | Description | Default |
---|---|---|
CONNECTION_FACTORY_CLASS |
ConnectionFactory is used to create and maintain ConnectionPool. | "throttled.store.ConnectionFactory" |
CONNECTION_POOL_CLASS |
ConnectionPool import path. | "redis.connection.ConnectionPool" |
CONNECTION_POOL_KWARGS |
ConnectionPool construction parameters. | {} |
REDIS_CLIENT_CLASS |
RedisClient import path, uses redis.client.Redis by default. | "redis.client.Redis" |
REDIS_CLIENT_KWARGS |
RedisClient construction parameters. | {} |
PASSWORD |
Password. | null |
SOCKET_TIMEOUT |
ConnectionPool parameters. | null |
SOCKET_CONNECT_TIMEOUT |
ConnectionPool parameters. | null |
SENTINELS |
(host, port) tuple list, for sentinel mode, please use SentinelConnectionFactory and provide this configuration. |
[] |
SENTINEL_KWARGS |
Sentinel construction parameters. | {} |
MemoryStore is essentially a LRU Cache based on memory with expiration time.
Parameter | Description | Default |
---|---|---|
MAX_SIZE |
Maximum capacity. When the number of stored key-value pairs exceeds MAX_SIZE , they will be eliminated according to the LRU policy. |
1024 |
All exceptions inherit from throttled.exceptions.BaseThrottledError
.
When a request is throttled, an exception is thrown, such as: Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
.
Field | Type | Description |
---|---|---|
rate_limit_result |
RateLimitResult |
The result after executing the RateLimiter for the given key. |
Thrown when the parameter is invalid, such as: Invalid key: None, must be a non-empty key.
.
- Python Version: 3.13.1 (CPython implementation)
- Operating System: macOS Darwin 23.6.0 (ARM64 architecture)
- Redis Version: 7.x (local connection)
Throughput in req/s, Latency in ms/op.
Algorithm Type | In-Memory (Single-thread) | In-Memory (16 threads) | Redis (Single-thread) | Redis (16 threads) |
---|---|---|---|---|
Baseline [1] | 1,692,307 / 0.0002 | 135,018 / 0.0004 [2] | 17,324 / 0.0571 | 16,803 / 0.9478 |
Fixed Window | 369,635 / 0.0023 | 57,275 / 0.2533 | 16,233 / 0.0610 | 15,835 / 1.0070 |
Sliding Window | 265,215 / 0.0034 | 49,721 / 0.2996 | 12,605 / 0.0786 | 13,371 / 1.1923 |
Token Bucket | 365,678 / 0.0023 | 54,597 / 0.2821 | 13,643 / 0.0727 | 13,219 / 1.2057 |
Leaky Bucket | 364,296 / 0.0023 | 54,136 / 0.2887 | 13,628 / 0.0727 | 12,579 / 1.2667 |
GCRA | 373,906 / 0.0023 | 53,994 / 0.2895 | 12,901 / 0.0769 | 12,861 / 1.2391 |
- [1] Baseline: In-Memory -
dict[key] += 1
, Redis -INCRBY key increment
. - [2] In-Memory concurrent baseline uses
threading.RLock
for thread safety. - [3] Performance: In-Memory - ~2.5-4.5x
dict[key] += 1
operations, Redis - ~1.06-1.37xINCRBY key increment
operations. - [4] Benchmark code: tests/benchmarks/test_throttled.py.