一、原理:
令牌桶算法是一种常用的流量控制算法,用于限制请求或事件的发生速率,以防止系统过载。它的原理是基于令牌桶的数据结构和一定的令牌产生规则。
在令牌桶算法中,可以将令牌桶看作是一个具有固定容量的桶,以一定的速率产生令牌,并按照一定的规则进行存放。每当有请求或事件发生时,首先需要从桶中获取一个令牌,如果桶中没有可用的令牌,则需要等待或丢弃请求。当桶中的令牌数达到最大容量时,产生的令牌也不会继续增加。
具体工作原理如下:
令牌桶中有两个关键参数:令牌产生速率(token generation rate)和令牌容量(token bucket capacity)。
在每个固定时间间隔(例如,每秒),系统会向令牌桶中添加一定数量的令牌(即产生令牌),直到桶的容量达到最大值。
当有请求或事件发生时,需要先从令牌桶中获取一个令牌。
- 如果桶中有可用的令牌,则允许请求通过,并移除一个令牌。
- 如果桶中没有令牌,则需要等待,或者直接拒绝请求,这取决于具体的限流策略。
令牌桶算法的优点在于可以对请求的速率进行平滑的控制,且具备较好的适应性,可以应对突发流量。由于令牌的产生速率是固定的,因此可以精确控制系统的请求处理速率,防止系统的过载和资源耗尽。
在分布式系统中,令牌桶算法也常被用于实现分布式限流,保护后端服务免受过多请求的影响,确保系统的稳定性和可靠性。
二、基于redis实现令牌桶算法
这个算法,设计的时候主要是考虑到要支持分布式系统的令牌桶资源共享,因此这样设计,下面就是具体的实战代码
首先是配置:
application.yml:
server:
port: 8081
spring:
#数据库连接配置
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://ip:3306/study?characterEncoding=utf-8&useSSL=false
username:
password:
redis:
# 地址
host:
# 端口,默认为6379
port: 6379
# 数据库索引
database: 8
# 密码
password:
# 连接超时时间
timeout: 10s
#mybatis的相关配置
mybatis:
#mapper配置文件
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.zhg.demo.mybatis.entity
#开启驼峰命名
configuration:
map-underscore-to-camel-case: true
maven依赖:
org.aspectj
aspectjrt
1.9.7
org.aspectj
aspectjtools
1.9.7
provided
org.springframework.boot
spring-boot-starter-data-redis
true
org.springframework.boot
spring-boot-starter-data-redis
2.5.14
redis配置:
package com.jlstest.springbootdemo.config;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* redis配置
*
* @author admin
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport
{
@Bean
@SuppressWarnings(value = { "unchecked", "rawtypes" })
public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory)
{
RedisTemplate template = new RedisTemplate();
template.setConnectionFactory(connectionFactory);
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
@Bean
public DefaultRedisScript limitScript()
{
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setScriptText(limitScriptText());
redisScript.setResultType(Long.class);
return redisScript;
}
/**
* 限流脚本
*/
private String limitScriptText()
{
return "local key = KEYS[1]\n" +
"local count = tonumber(ARGV[1])\n" +
"local time = tonumber(ARGV[2])\n" +
"local current = redis.call('get', key);\n" +
"if current and tonumber(current) > count then\n" +
" return tonumber(current);\n" +
"end\n" +
"current = redis.call('incr', key)\n" +
"if tonumber(current) == 1 then\n" +
" redis.call('expire', key, time)\n" +
"end\n" +
"return tonumber(current);";
}
}
package com.jlstest.springbootdemo.config;
import java.nio.charset.Charset;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.JSONWriter;
/**
* Redis使用FastJson序列化
*
* @author admin
*/
public class FastJson2JsonRedisSerializer implements RedisSerializer
{
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private Class clazz;
public FastJson2JsonRedisSerializer(Class clazz)
{
super();
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) throws SerializationException
{
if (t == null)
{
return new byte[0];
}
return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET);
}
@Override
public T deserialize(byte[] bytes) throws SerializationException
{
if (bytes == null || bytes.length 0) {
tokenCount--;
log.info("扣除之后tokenCount:{}", tokenCount);
// 保存令牌桶数量
redisCache.setCacheObject(key, tokenCount, 60, TimeUnit.MINUTES);
// 保存最近一次更新token的时间
redisCache.setCacheObject(LAST_REFILL_TIME_KEY + resourceName, System.currentTimeMillis(), 60, TimeUnit.MINUTES);
return true;
} else {
return false;
}
}
/**
* 初始化令牌桶
*/
private void initializeTokenBucket() {
// 当资源为空时,则进行新建
if (redisCache.getCacheObject(RATE_LIMIT_KEY + resourceName) == null) {
// 保存最近一次更新token的时间
redisCache.setCacheObject(LAST_REFILL_TIME_KEY + resourceName, System.currentTimeMillis(), 60, TimeUnit.MINUTES);
// 保存令牌桶数量,设置默认值,设置默认值
redisCache.setCacheObject(RATE_LIMIT_KEY + resourceName, maxTokens, 60, TimeUnit.MINUTES);
// 设置过期时间,当长期不用则进行释放
redisCache.expire(resourceName, 3600L);
}
}
}
对应讲解:
核心主要是通过redis来保存对应的令牌桶实例名以及对应的上次的更新token的时间,每次调用到令牌桶则重新计算令牌数量。当然这个设计比较毛糙,比如在规定时间中未必会有对应数量的令牌数量,主要是由于每次计算令牌数量,当计算成功时是不管是否整除都默认是整除来保存时间,所以会有数量偏少的情况
接口上的放置
package com.jlstest.springbootdemo.controller;
import java.util.concurrent.TimeUnit;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.jlstest.springbootdemo.aop.RateLimit;
import com.jlstest.springbootdemo.common.response.BaseController;
import com.jlstest.springbootdemo.common.response.JlsTestResponse;
/**
* @author JLS
* @description:
* @since 2023-03-22 19:09
*/
@RestController
@RequestMapping("/test")
public class TestController extends BaseController {
@GetMapping("/test")
@ResponseBody
@RateLimit(resourceName = "test", initialCapacity = 10, refillRate = 2, refillTimeUnit = TimeUnit.SECONDS)
public JlsTestResponse test() {
return sendSuccessData("success");
}
}
如图所示,放在接口上就行。
三、基于redisson实现
redisson本身就已经封装了限流器RRateLimiter,只要稍加封装即可使用,
对应的代码:
package com.jlstest.springbootdemo.aop.newLimit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimitNew {
// 资源名
String resourceName();
// 令牌总数设置
int permits();
// 恢复速率,一边填写个数单位默认秒。
int restoreRate();
}
package com.jlstest.springbootdemo.aop.newLimit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.jlstest.springbootdemo.common.exception.ServiceException;
/**
* @author JLS
* @description:
* @since 2023-08-01 14:56
*/
@Aspect
@Component
@Scope
public class RateLimitAspectNew {
private final Map rateLimiterMap = new ConcurrentHashMap();
private final Map lastAccessTimeMap = new ConcurrentHashMap();
// @Value("${redis.address}")
// private String redisAddress; // Redis连接地址,可以从配置文件中读取
@Resource
private RedissonClient redissonClient;
@Before("@annotation(rateLimitNew)")
public void before(JoinPoint joinPoint, RateLimitNew rateLimitNew) {
String resourceName = rateLimitNew.resourceName();
int permits = rateLimitNew.permits();
int restoreRate = rateLimitNew.restoreRate();
// 创建或获取令牌桶
RRateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(resourceName, key -> {
// 获取对应资源名的实例,当资源不存在时会新建一个
RRateLimiter limiter = redissonClient.getRateLimiter(resourceName);
// 使用 trySetRate 方法设置令牌桶的速率。,只有新建限流器的时候才会设置属性
limiter.trySetRate(RateType.OVERALL, permits, restoreRate, RateIntervalUnit.SECONDS);
// 返回对应实例
return limiter;
});
// 当时消费令牌
if (!rateLimiter.tryAcquire()) {
throw new ServiceException("Rate limit exceeded for resource: " + resourceName);
}
lastAccessTimeMap.put(resourceName, System.currentTimeMillis());
}
// 定期清除不活跃的令牌桶
@Scheduled(fixedDelay = 60000) // 每分钟执行一次清理任务
public void cleanUpRateLimiters() {
long inactiveDuration = 5 * 60 * 1000; // 5分钟不活跃则清除
long currentTime = System.currentTimeMillis();
rateLimiterMap.entrySet().removeIf(entry -> {
String resourceName = entry.getKey();
Long lastAccessTime = lastAccessTimeMap.get(resourceName);
// 判断是否超过不活跃时间
if (lastAccessTime != null && currentTime - lastAccessTime > inactiveDuration) {
// 移除令牌桶实例
RRateLimiter rateLimiter = entry.getValue();
rateLimiter.delete();
// 移除资源名的记录
lastAccessTimeMap.remove(resourceName);
return true; // 移除该令牌桶实例
}
return false; // 不需要移除该令牌桶实例
});
}
}
测试接口:
package com.jlstest.springbootdemo.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.jlstest.springbootdemo.aop.newLimit.RateLimitNew;
import com.jlstest.springbootdemo.common.response.BaseController;
import com.jlstest.springbootdemo.common.response.JlsTestResponse;
/**
* @author JLS
* @description:
* @since 2023-03-22 19:09
*/
@RestController
@RequestMapping("/test")
public class TestController extends BaseController {
@GetMapping("/test")
@ResponseBody
// @RateLimit(resourceName = "test", initialCapacity = 10, refillRate = 2, refillTimeUnit =
// TimeUnit.SECONDS)
@RateLimitNew(resourceName = "test1", permits = 1, restoreRate = 10)
public JlsTestResponse test() {
return sendSuccessData("success");
}
}
其他的一些配置代码,同redis的实现,就不再重复写出,
每个服务实例可以独立管理自己的限流器,但令牌桶的状态和数据是存储在 Redis 中的,这意味着所有实例共享相同的令牌桶信息。当一个实例获取或更新令牌桶的状态时,其他实例也可以立即感知到这些变化,从而实现在分布式系统中的限流效果。
结合 Redisson 的 RRateLimiter
和 Redis 缓存,可以实现分布式系统的限流,确保系统稳定性和资源的合理利用。
四、现成的工具-sentinel实现
Sentinel是阿里巴巴开源的一款分布式系统的流量控制组件,用于保护后端服务免受过多请求的影响,确保系统的稳定性和可靠性。Sentinel提供了多种限流策略和流量控制规则,能够灵活地适应不同场景的需求。
以下是Sentinel组件限流的基本使用步骤:
使用Sentinel进行限流的示例代码(基于Spring Boot):
Maven:
xmlCopy code
com.alibaba.csp
sentinel-core
1.8.1
yamlCopy codespring:
cloud:
sentinel:
transport:
dashboard: localhost:8080 # Sentinel Dashboard 地址
javaCopy codeimport org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
// 初始化 Sentinel
initSentinel();
SpringApplication.run(Application.class, args);
}
private static void initSentinel() {
// 这里的参数可以根据实际情况进行调整
System.setProperty("csp.sentinel.api.port", "8720");
System.setProperty("csp.sentinel.dashboard.server", "localhost:8080");
System.setProperty("project.name", "your-project-name");
}
}
javaCopy codeimport com.alibaba.csp.sentinel.annotation.SentinelResource;
@Service
public class YourService {
@SentinelResource(value = "yourResourceName", blockHandler = "blockHandlerMethod")
public void yourMethod() {
// 业务逻辑
}
// 定义限流策略的处理方法
public void blockHandlerMethod(BlockException ex) {
// 限流处理逻辑
}
}
上述示例代码中,我们使用了Sentinel的注解@SentinelResource
来标注需要进行限流保护的方法。当达到限流阈值时,会调用blockHandler
指定的方法进行限流处理。在blockHandlerMethod
中,可以自定义限流策略的处理逻辑。
需要注意的是,Sentinel的流控规则可以在Dashboard中进行配置和管理,也可以通过代码进行动态配置,使得限流策略可以根据实际情况进行灵活调整。同时,Dashboard提供了实时的监控和统计功能,方便查看应用程序的流量控制情况和系统状态。
sentinel虽然好 ,但是这个组件所包含的东西过大,有些时候只需要用到限流功能,则会显得有点大材小用,没有必要。