速率限制模式用于限制一定时间内某个用户、服务等对资源的使用,来避免系统过载。这里不会太详细地介绍该模式,更多地是描述代码实现,不过你可以访问如下资料了解更多:
- 速率限制模式 - Azure文档
- 限制模式 - Azure文档
- RateLimiter - failsafe
设计
这里主要用 Java 代码做出一个速率限制模式的实现——对 HTTP API 接口进行速率限制,限制一个用户一段时间对接口的有效访问次数。抽象流程如下:
/api/some-resources
Header | 说明 |
---|---|
X-RateLimit-Limit | 最大请求次数 |
X-RateLimit-Remaining | 剩余请求次数 |
X-RateLimit-Reset | 下次重置时间,单位秒时间戳 |
关于认证和鉴权可以浏览我的另一篇文章:应用安全性:认证和鉴权
实现用的 Web 框架是 Vert.X Web,速率限制器是自己实现写的,当然也有很多流行的第三方库,比如 guava RateLimiter 、failsafe RateLimiter,但是自己为了能记录和得到剩余请求次数和下次重置时间,就自己设计实现了两个:
- 本地速率限制 - 使用
Caffeine Cache
做存储,使用ReentrantReadWriteLock
保证并发操作时数据一致性;仅限单实例使用 - 分布式速率限制 - 使用
Redis
做存储,使用Lua script
保证并发操作时数据一致性;支持多实例使用
详细代码可以在我的 github 项目 microservices-patterns-in-action找到。
抽象速率限制处理器
为了符合 Vert.X 的规范,先写一个 RateLimitHandler
继承 Handler
,里面保存了 headers、工具方法和工厂方法。
/**
* RateLimitHandler
*
*
* 实现
* 速率限制模式
*/
public interface RateLimitHandler extends Handler {
public static final String HEADER_LIMIT = "X-RateLimit-Limit";
public static final String HEADER_REMAINING = "X-RateLimit-Remaining";
/** 下次重置时间,单位 s */
public static final String HEADER_RESET = "X-RateLimit-Reset";
public static final int STATUS_CODE_LIMITED = 429;
@Override
public void handle(RoutingContext context);
public static RateLimitHandler createLocal(int limit, Duration duration) {
return new LocalRateLimitHandler(limit, duration);
}
public static RateLimitHandler createRedis(Redis redis, int limit, Duration duration) {
return new RedisRateLimitHandler(redis, limit, duration);
}
public record LimitDetail(int limit, int count, long reset) {
public boolean canAccess() {
return count {
RateLimitHandler.addHeadersEndHandler(context, limitDetail);
if (limitDetail.canAccess()) {
context.next();
} else {
RateLimitHandler.endLimited(context);
}
}).onFailure(context::fail);
}
private Future tryAcquire(String userId) {
String key = "rl:userid:" + userId;
return sendEval(key, limit, duration)
.compose(Future::succeededFuture, t -> loadScript().compose(res -> sendEval(key, limit, duration)))
.map( res -> {
int count = res.get(0).toInteger();
long reset = res.get(1).toLong();
return new LimitDetail(limit, res.get(0).toInteger(), res.get(1).toLong());
});
}
private Future sendEval(String key, int limit, long duration) {
return redis.send(Request.cmd(Command.EVALSHA, scriptSHA.toString(), 1, key, limit, duration));
}
private Future loadScript() throws StatusCodeResponseException {
String script = null;
try {
var bytes = RedisRateLimitHandler.class.getResourceAsStream(SCRIPT_CLASSPATH).readAllBytes();
script = new String(bytes);
} catch (Exception e) {
log.error("Read script failed - {} {}", SCRIPT_CLASSPATH, e.getMessage());
return Future.failedFuture(StatusCodeResponseException.create(500));
}
return redis.send(Request.cmd(Command.SCRIPT, "load", script))
.onSuccess(res -> scriptSHA.set(res.toString()));
}
}
Lua 脚本,用于检查和更新速率限制情况:
-- rateLimiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local duration = tonumber(ARGV[2])
local exists = redis.call('exists', key)
local now = tonumber(redis.call('time')[1])
if (exists == 0) then
local reset = now + duration
redis.call('HSET', key, 'count', 1, 'reset', reset)
redis.call('EXPIREAT', key, reset)
return {1, reset}
else
local result = redis.call('HMGET', key, 'count', 'reset')
local count, reset = tonumber(result[1]), tonumber(result[2])
if (now >= reset) then
local newReset = now + duration
redis.call('HSET', key, 'count', 1, 'reset', newReset)
redis.call('EXPIREAT', key, newReset)
return {1, newReset}
end
redis.call("HINCRBY", key, 'count', 1)
return {count + 1, reset}
end
速率限制器的逻辑如 rateLimiter.lua
所描述的那样,因为 lua 脚本在redis中是串行执行的,所以能保证并发情况下数据的一致性。
因为如果每次都调用 eval big-script-text keys args
的话,请求体积太大了,所以可以使用 script load
先加载 script 到 redis 中,每次调用时使用 evalsha sha-key keys args
可大大减少请求体积。
使用和测试
在 Vert.X 配置 Router
时,添加 认证处理器 和 速率限制处理器,
router.route("/api/*").handler(SimpleAuthHandler.create());
// 限制每用户每分钟最多请求100次
// 本地速率限制处理器
router.route("/api/*").handler(RateLimitHandler.createLocal(100, Duration.ofSeconds(60)));
// 或者
// Redis 速率限制处理器
Redis redis = Redis.createClient(vertx);
RateLimitHandler rateLimitHandler = RateLimitHandler.createRedis(redis, 100, Duration.ofSeconds(60));
router.route("/api/*").handler(rateLimitHandler);
这里配置的是:限制每用户每分钟最多请求100次。
需要注意的是处理器的顺序,认证处理器 应该在 速率限制处理器 前面,速率限制处理器 应该在业务处理器前面,而日志处理器、超时处理器等应该在 认证处理器 之前。这里有一份官方的处理器顺序建议 whats-new-in-vert-x-4-3。
冒烟测试/功能测试
先测试下基本的功能。
认证:
GET {{VERTX_SERVICE}}/api/test-data
HTTP/1.1 401 Unauthorized
connection: close
content-length: 0
鉴权
GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: bad-user
HTTP/1.1 403 Forbidden
connection: close
content-length: 0
vertx用户请求一次
GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx
HTTP/1.1 200 OK
content-type: application/json
connection: close
content-length: 20
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 99
X-RateLimit-Reset: 1689133896
{ "data": "test-data" }
vertx用户请求好几次后
GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx
HTTP/1.1 200 OK
content-type: application/json
connection: close
content-length: 20
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 64
X-RateLimit-Reset: 1689133896
{ "data": "test-data" }
vertx用户请求一分钟内超过100次后
GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx
HTTP/1.1 429 Too Many Requests
connection: close
content-length: 0
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1689134120
换个用户spring
GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: spring
HTTP/1.1 200 OK
content-type: application/json
connection: close
content-length: 20
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 99
X-RateLimit-Reset: 1689134062
{ "data": "test-data" }
一分钟后vertx再次请求
GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx
HTTP/1.1 200 OK
content-type: application/json
connection: close
content-length: 20
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 99
X-RateLimit-Reset: 1689134226
{ "data": "test-data" }
当然这个功能测试过程可以使用 K6 编写脚本来自动化。
负载测试
使用 K6 编写脚本来测试 Redis 速率限制器的性能。
k6脚本如下
import http from 'k6/http';
import execution from 'k6/execution'
import { sleep } from 'k6';
export const options = {
stages: [
{ duration: "30s", target: 200 },
{duration: "3m", target: 200},
{duration: "30s", target: 0}
],
};
export default function () {
const user = "LoadTestUser-" + execution.vu.idInTest
const params = {
"headers": {
"X-Auth-UserId": user,
"X-LoadTest": "true"
}
}
const res = http.get('http://localhost:7703/api/test-data', params)
console.log(user, "-", res.status_text, "-",
res.headers['X-Ratelimit-Limit'],
res.headers['X-Ratelimit-Remaining'],
res.headers['X-Ratelimit-Reset'])
}
负载测试有三个阶段
需要注意的是
- HTTP headers
X-LoadTest: ture
的请求会被服务器日志记录过滤掉 - 每个并发用户执行完请求后会继续执行下个请求,没有 sleep 时间
- 操作系统和JDK使用的是Windows和JDK17
- Redis 在 wsl 中启动,所以网络IO很短
- Vertx Redis 连接配置使用默认的,没有优化
- Vertx HTTP 服务器配置使用默认,没有优化
- K6 中的日志打印会影响客户端请求速度
这是其中的一段日志,可以看出功能还都正常
负载测试结果如下:
可以看出在服务器单实例情况下,QPS每秒 2000 多还行,平均迭代用时80ms,这可能是客户端日志打印用点时间,服务端redis连接池没有优化用点时间。85% 的失败率在意料之中,毕竟每用户每分钟只能请求100个,否则就返回429算作失败。
总结
速率限制模式中最大限制次数和单位时间应该可以根据不同的用户动态配置,比如一个用户A本来是普通用户,每小时限制使用2000次,但是称为VIP后,每小时限制提升至10000次,如果在同一个小时内发生,这时候 limit 从 2000 变成 10000,count 不变,那么用户将可以比较 count