令牌桶算法实现限流
令牌桶算法是一种常见的限流算法,它通过维护一个固定容量的令牌桶来控制流量。每个请求需要获取一个令牌,如果桶中没有足够的令牌,则请求会被限制。
首先,你需要在Redis中设置一个计数器和一个定时器来模拟令牌桶:
import redis
import time
# 连接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 设置令牌桶容量和每秒生成的令牌数
bucket_capacity = 10
tokens_per_second = 2
# 初始化令牌桶
r.set('tokens', bucket_capacity)
r.set('last_time', int(time.time()))
# 请求令牌的函数
def request_token():
current_time = int(time.time())
last_time = int(r.get('last_time'))
elapsed_time = current_time - last_time
# 计算新增的令牌数量
new_tokens = elapsed_time * tokens_per_second
current_tokens = int(r.get('tokens'))
# 更新令牌数量
if new_tokens + current_tokens > bucket_capacity:
r.set('tokens', bucket_capacity)
else:
r.set('tokens', new_tokens + current_tokens)
r.set('last_time', current_time)
# 使用令牌的代码
def process_request():
if int(r.get('tokens')) > 0:
# 执行你的请求处理逻辑
print('请求通过')
r.decr('tokens') # 消耗一个令牌
else:
print('请求被限制')
# 测试请求
for _ in range(15):
request_token()
process_request()
time.sleep(1)
这个示例中,我们通过Redis来维护令牌桶的状态,并在请求到来时检查是否有足够的令牌。如果有足够的令牌,请求将被处理,否则请求将被限制。
漏桶算法实现限流
漏桶算法是另一种流量控制算法,它维护一个固定容量的漏桶,请求进来后,会以固定速率从漏桶中排出。
以下是使用Redis实现漏桶算法的示例:
import redis
import time
# 连接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 设置漏桶容量和漏出速率(每秒排出的请求数)
bucket_capacity = 10
leak_rate = 2
# 初始化漏桶
r.set('bucket_capacity', bucket_capacity)
r.set('last_leak_time', int(time.time()))
# 请求处理函数
def process_request():
current_time = int(time.time())
last_leak_time = int(r.get('last_leak_time'))
time_elapsed = current_time - last_leak_time
# 计算漏出的请求数
leaked_requests = min(int(r.get('bucket_capacity')) * (time_elapsed // 1), int(r.get('bucket_capacity')))
# 更新漏桶状态
r.incrby('bucket_capacity', leaked_requests)
r.set('last_leak_time', current_time)
# 处理请求
if int(r.get('bucket_capacity')) >= 1:
print('请求通过')
r.decr('bucket_capacity')
else:
print('请求被限制')
# 测试请求
for _ in range(15):
process_request()
time.sleep(1)
在漏桶算法中,请求会被排入漏桶中,然后以固定速率漏出。如果漏桶中有请求,则请求会被处理,否则请求会被限制。
以上两个案例虽然能够实现限流,但是存在一定的问题,无法满足生产的要求,下面讲一下其他思路。
有序集合zset实现限流
使用Redis的有序集合(ZSET)也可以实现限流功能。有序集合中的成员可以关联一个分数,我们可以使用分数来表示每个请求的权重或时间戳,并利用有序集合的排序特性来判断请求是否被允许。
以下是使用有序集合实现基于时间窗口的限流示例:
import redis
import time
# 连接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 限流配置
max_requests = 10 # 在时间窗口内允许的最大请求数
window_duration = 60 # 时间窗口的持续时间(秒)
# 请求处理函数
def process_request(user_id):
current_time = time.time()
zset_key = "requests:" + user_id
# 删除时间窗口之外的请求记录
r.zremrangebyscore(zset_key, '-inf', current_time - window_duration)
# 获取当前时间窗口内的请求数
requests_in_window = r.zcard(zset_key)
if requests_in_window < max_requests:
# 如果请求数在限制范围内,允许请求并记录请求时间
r.zadd(zset_key, {str(current_time): current_time})
print('请求通过')
else:
print('请求被限制')
# 测试请求
user_id = "user123"
for _ in range(15):
process_request(user_id)
time.sleep(2)
在这个示例中,我们为每个用户维护一个有序集合,其中成员是请求的时间戳,分数也设置为时间戳。在处理请求时,我们首先删除时间窗口之外的请求记录,然后检查时间窗口内的请求数是否超过了限制。如果没有超过限制,允许请求并记录请求时间戳。
这种方法可以实现基于时间窗口的限流,你可以根据需要调整max_requests和window_duration来配置限流策略。
但是这样又引发了一个并发性问题
在分布式系统中,处理请求的并发性是一个重要考虑因素,特别是在多个客户端同时发送请求的情况下。以下是一些常见的方法来确保process_request操作的并发安全性:
互斥锁(Mutex Lock):使用互斥锁可以确保在同一时刻只有一个线程或进程可以执行process_request操作。这可以通过在关键部分的代码周围放置锁来实现。在Redis中,你可以使用Redis的SETNX(Set If Not Exists)命令来实现互斥锁,确保只有一个客户端可以获取锁并执行请求处理操作。
def process_request(user_id):
lock_key = "lock:" + user_id
acquired_lock = r.setnx(lock_key, "1")
if acquired_lock:
try:
# 在获取锁后,执行请求处理操作
current_time = time.time()
zset_key = "requests:" + user_id
# 删除时间窗口之外的请求记录
r.zremrangebyscore(zset_key, '-inf', current_time - window_duration)
# 获取当前时间窗口内的请求数
requests_in_window = r.zcard(zset_key)
if requests_in_window < max_requests:
# 如果请求数在限制范围内,允许请求并记录请求时间
r.zadd(zset_key, {str(current_time): current_time})
print('请求通过')
else:
print('请求被限制')
finally:
# 释放锁
r.delete(lock_key)
else:
print('无法获取锁,请求被限制')
分布式锁:如果你的系统是分布式的,你可以考虑使用分布式锁来确保不同节点上的请求处理代码不会同时执行。一些常见的分布式锁实现包括基于ZooKeeper或Redis的分布式锁。这些锁可以协调不同节点之间的并发执行。
事务:Redis支持事务,你可以使用MULTI和EXEC命令将多个操作包装在一个事务中。在这种情况下,Redis会确保整个事务要么全部成功执行,要么全部失败,从而保证一致性。
这种虽然能解决问题,但是并不是最优解
EXEC + lua 实现
使用Redis的EXEC命令和Lua脚本可以确保多个Redis命令在一个事务中执行,从而保证一致性。下面是一个使用EXEC和Lua脚本来实现请求处理的示例:
import redis
# 连接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 限流配置
max_requests = 10 # 在时间窗口内允许的最大请求数
window_duration = 60 # 时间窗口的持续时间(秒)
# Lua脚本,用于限流处理
lua_script = """
local user_id = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window_duration = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
-- 删除时间窗口之外的请求记录
redis.call('ZREMRANGEBYSCORE', 'requests:'..user_id, '-inf', current_time - window_duration)
-- 获取当前时间窗口内的请求数
local requests_in_window = redis.call('ZCARD', 'requests:'..user_id)
if requests_in_window < max_requests then
-- 如果请求数在限制范围内,允许请求并记录请求时间
redis.call('ZADD', 'requests:'..user_id, current_time, current_time)
return 'ALLOWED'
else
return 'LIMITED'
end
"""
# 请求处理函数
def process_request(user_id):
current_time = int(time.time())
result = r.eval(lua_script, 1, user_id, max_requests, window_duration, current_time)
if result == b'ALLOWED':
print('请求通过')
else:
print('请求被限制')
# 测试请求
user_id = "user123"
for _ in range(15):
process_request(user_id)
time.sleep(2)
在上述示例中,我们使用Lua脚本编写了一个与之前的请求处理逻辑相同的限流处理逻辑。然后,我们通过eval命令将Lua脚本传递给Redis,并在一个事务中执行它。这样可以确保在同一事务内执行多个Redis命令,从而保证了一致性。
请注意,在Lua脚本中,我们使用了Redis的命令来执行限流逻辑,然后根据结果返回相应的值,以便在Python中进行处理。如果请求被限制,Lua脚本返回'LIMITED',否则返回'ALLOWED'。
通过这种方式,你可以使用Redis实现基于令牌桶算法的限流功能。可以根据需要调整令牌桶容量和生成速率来满足你的应用需求。此外,需要注意在高并发情况下,需要谨慎处理并发问题。