基于令牌桶的限流器注解的简单实现

2023年 8月 9日 52.2k 0

一、原理:

令牌桶算法是一种常用的流量控制算法,用于限制请求或事件的发生速率,以防止系统过载。它的原理是基于令牌桶的数据结构和一定的令牌产生规则。

在令牌桶算法中,可以将令牌桶看作是一个具有固定容量的桶,以一定的速率产生令牌,并按照一定的规则进行存放。每当有请求或事件发生时,首先需要从桶中获取一个令牌,如果桶中没有可用的令牌,则需要等待或丢弃请求。当桶中的令牌数达到最大容量时,产生的令牌也不会继续增加。

具体工作原理如下:

  • 令牌桶中有两个关键参数:令牌产生速率(token generation rate)和令牌容量(token bucket capacity)。

  • 在每个固定时间间隔(例如,每秒),系统会向令牌桶中添加一定数量的令牌(即产生令牌),直到桶的容量达到最大值。

  • 当有请求或事件发生时,需要先从令牌桶中获取一个令牌。

    • 如果桶中有可用的令牌,则允许请求通过,并移除一个令牌。
    • 如果桶中没有令牌,则需要等待,或者直接拒绝请求,这取决于具体的限流策略。
  • 令牌桶算法的优点在于可以对请求的速率进行平滑的控制,且具备较好的适应性,可以应对突发流量。由于令牌的产生速率是固定的,因此可以精确控制系统的请求处理速率,防止系统的过载和资源耗尽。

    在分布式系统中,令牌桶算法也常被用于实现分布式限流,保护后端服务免受过多请求的影响,确保系统的稳定性和可靠性。

    二、基于redis实现令牌桶算法

    这个算法,设计的时候主要是考虑到要支持分布式系统的令牌桶资源共享,因此这样设计,下面就是具体的实战代码

    首先是配置:

    application.yml:

    server:
      port: 8081
    spring:
      #数据库连接配置
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://ip:3306/study?characterEncoding=utf-8&useSSL=false
        username:
        password:
    ​
      redis:
        # 地址
        host:
        # 端口,默认为6379
        port: 6379
        # 数据库索引
        database: 8
        # 密码
        password:
        # 连接超时时间
        timeout: 10s
    ​
    #mybatis的相关配置
    mybatis:
      #mapper配置文件
      mapper-locations: classpath:mapper/*.xml
      type-aliases-package: com.zhg.demo.mybatis.entity
      #开启驼峰命名
      configuration:
        map-underscore-to-camel-case: true
    

    maven依赖:

    
            
                org.aspectj
                aspectjrt
                1.9.7
            
    ​
            
            
                org.aspectj
                aspectjtools
                1.9.7
                provided
            
    ​
            
                org.springframework.boot
                spring-boot-starter-data-redis
                true
            
    ​
            
            
                org.springframework.boot
                spring-boot-starter-data-redis
                2.5.14
            
    

    redis配置:

    package com.jlstest.springbootdemo.config;
    ​
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.script.DefaultRedisScript;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    ​
    /**
     * redis配置
     *
     * @author admin
     */
    @Configuration
    @EnableCaching
    public class RedisConfig extends CachingConfigurerSupport
    {
        @Bean
        @SuppressWarnings(value = { "unchecked", "rawtypes" })
        public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory)
        {
            RedisTemplate template = new RedisTemplate();
            template.setConnectionFactory(connectionFactory);
    ​
            FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
    ​
            // 使用StringRedisSerializer来序列化和反序列化redis的key值
            template.setKeySerializer(new StringRedisSerializer());
            template.setValueSerializer(serializer);
    ​
            // Hash的key也采用StringRedisSerializer的序列化方式
            template.setHashKeySerializer(new StringRedisSerializer());
            template.setHashValueSerializer(serializer);
    ​
            template.afterPropertiesSet();
            return template;
        }
    ​
        @Bean
        public DefaultRedisScript limitScript()
        {
            DefaultRedisScript redisScript = new DefaultRedisScript();
            redisScript.setScriptText(limitScriptText());
            redisScript.setResultType(Long.class);
            return redisScript;
        }
    ​
        /**
         * 限流脚本
         */
        private String limitScriptText()
        {
            return "local key = KEYS[1]\n" +
                    "local count = tonumber(ARGV[1])\n" +
                    "local time = tonumber(ARGV[2])\n" +
                    "local current = redis.call('get', key);\n" +
                    "if current and tonumber(current) > count then\n" +
                    "    return tonumber(current);\n" +
                    "end\n" +
                    "current = redis.call('incr', key)\n" +
                    "if tonumber(current) == 1 then\n" +
                    "    redis.call('expire', key, time)\n" +
                    "end\n" +
                    "return tonumber(current);";
        }
    }
    ​
    
    package com.jlstest.springbootdemo.config;
    ​
    import java.nio.charset.Charset;
    ​
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.data.redis.serializer.SerializationException;
    ​
    import com.alibaba.fastjson2.JSON;
    import com.alibaba.fastjson2.JSONReader;
    import com.alibaba.fastjson2.JSONWriter;
    ​
    /**
     * Redis使用FastJson序列化
     * 
     * @author admin
     */
    public class FastJson2JsonRedisSerializer implements RedisSerializer
    {
        public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
    ​
        private Class clazz;
    ​
        public FastJson2JsonRedisSerializer(Class clazz)
        {
            super();
            this.clazz = clazz;
        }
    ​
        @Override
        public byte[] serialize(T t) throws SerializationException
        {
            if (t == null)
            {
                return new byte[0];
            }
            return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET);
        }
    ​
        @Override
        public T deserialize(byte[] bytes) throws SerializationException
        {
            if (bytes == null || bytes.length  0) {
                tokenCount--;
                log.info("扣除之后tokenCount:{}", tokenCount);
                // 保存令牌桶数量
                redisCache.setCacheObject(key, tokenCount, 60, TimeUnit.MINUTES);
                // 保存最近一次更新token的时间
                redisCache.setCacheObject(LAST_REFILL_TIME_KEY + resourceName, System.currentTimeMillis(), 60, TimeUnit.MINUTES);
                return true;
            } else {
                return false;
            }
        }
    ​
        /**
         * 初始化令牌桶
         */
        private void initializeTokenBucket() {
            // 当资源为空时,则进行新建
            if (redisCache.getCacheObject(RATE_LIMIT_KEY + resourceName) == null) {
                // 保存最近一次更新token的时间
                redisCache.setCacheObject(LAST_REFILL_TIME_KEY + resourceName, System.currentTimeMillis(), 60, TimeUnit.MINUTES);
                // 保存令牌桶数量,设置默认值,设置默认值
                redisCache.setCacheObject(RATE_LIMIT_KEY + resourceName, maxTokens, 60, TimeUnit.MINUTES);
                // 设置过期时间,当长期不用则进行释放
                redisCache.expire(resourceName, 3600L);
            }
        }
    }
    ​
    ​
    
    对应讲解:

    核心主要是通过redis来保存对应的令牌桶实例名以及对应的上次的更新token的时间,每次调用到令牌桶则重新计算令牌数量。当然这个设计比较毛糙,比如在规定时间中未必会有对应数量的令牌数量,主要是由于每次计算令牌数量,当计算成功时是不管是否整除都默认是整除来保存时间,所以会有数量偏少的情况

    接口上的放置

    package com.jlstest.springbootdemo.controller;
    ​
    import java.util.concurrent.TimeUnit;
    ​
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    ​
    import com.jlstest.springbootdemo.aop.RateLimit;
    import com.jlstest.springbootdemo.common.response.BaseController;
    import com.jlstest.springbootdemo.common.response.JlsTestResponse;
    ​
    /**
     * @author JLS
     * @description:
     * @since 2023-03-22 19:09
     */
    @RestController
    @RequestMapping("/test")
    public class TestController extends BaseController {
    ​
        @GetMapping("/test")
        @ResponseBody
        @RateLimit(resourceName = "test", initialCapacity = 10, refillRate = 2, refillTimeUnit = TimeUnit.SECONDS)
        public JlsTestResponse test() {
            return sendSuccessData("success");
        }
    ​
    }
    ​
    

    如图所示,放在接口上就行。

    三、基于redisson实现

    redisson本身就已经封装了限流器RRateLimiter,只要稍加封装即可使用,

    对应的代码:

    package com.jlstest.springbootdemo.aop.newLimit;
    ​
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    ​
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface RateLimitNew {
        // 资源名
        String resourceName();
        // 令牌总数设置
        int permits();
        // 恢复速率,一边填写个数单位默认秒。
        int restoreRate();
    }
    ​
    
    package com.jlstest.springbootdemo.aop.newLimit;
    ​
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    ​
    import javax.annotation.Resource;
    ​
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.redisson.api.RRateLimiter;
    import org.redisson.api.RateIntervalUnit;
    import org.redisson.api.RateType;
    import org.redisson.api.RedissonClient;
    import org.springframework.context.annotation.Scope;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    ​
    import com.jlstest.springbootdemo.common.exception.ServiceException;
    ​
    /**
     * @author JLS
     * @description:
     * @since 2023-08-01 14:56
     */
    @Aspect
    @Component
    @Scope
    public class RateLimitAspectNew {
    ​
        private final Map rateLimiterMap = new ConcurrentHashMap();
    ​
        private final Map lastAccessTimeMap = new ConcurrentHashMap();
    ​
        // @Value("${redis.address}")
        // private String redisAddress; // Redis连接地址,可以从配置文件中读取
    ​
        @Resource
        private RedissonClient redissonClient;
    ​
        @Before("@annotation(rateLimitNew)")
        public void before(JoinPoint joinPoint, RateLimitNew rateLimitNew) {
            String resourceName = rateLimitNew.resourceName();
            int permits = rateLimitNew.permits();
            int restoreRate = rateLimitNew.restoreRate();
    ​
            // 创建或获取令牌桶
            RRateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(resourceName, key -> {
                // 获取对应资源名的实例,当资源不存在时会新建一个
                RRateLimiter limiter = redissonClient.getRateLimiter(resourceName);
                // 使用 trySetRate 方法设置令牌桶的速率。,只有新建限流器的时候才会设置属性
                limiter.trySetRate(RateType.OVERALL, permits, restoreRate, RateIntervalUnit.SECONDS);
                // 返回对应实例
                return limiter;
            });
    ​
            // 当时消费令牌
            if (!rateLimiter.tryAcquire()) {
                throw new ServiceException("Rate limit exceeded for resource: " + resourceName);
            }
    ​
            lastAccessTimeMap.put(resourceName, System.currentTimeMillis());
        }
    ​
        // 定期清除不活跃的令牌桶
        @Scheduled(fixedDelay = 60000) // 每分钟执行一次清理任务
        public void cleanUpRateLimiters() {
            long inactiveDuration = 5 * 60 * 1000; // 5分钟不活跃则清除
            long currentTime = System.currentTimeMillis();
            rateLimiterMap.entrySet().removeIf(entry -> {
                String resourceName = entry.getKey();
                Long lastAccessTime = lastAccessTimeMap.get(resourceName);
                // 判断是否超过不活跃时间
                if (lastAccessTime != null && currentTime - lastAccessTime > inactiveDuration) {
                    // 移除令牌桶实例
                    RRateLimiter rateLimiter = entry.getValue();
                    rateLimiter.delete();
                    // 移除资源名的记录
                    lastAccessTimeMap.remove(resourceName);
                    return true; // 移除该令牌桶实例
                }
                return false; // 不需要移除该令牌桶实例
            });
        }
    }
    ​
    

    测试接口:

    package com.jlstest.springbootdemo.controller;
    ​
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    ​
    import com.jlstest.springbootdemo.aop.newLimit.RateLimitNew;
    import com.jlstest.springbootdemo.common.response.BaseController;
    import com.jlstest.springbootdemo.common.response.JlsTestResponse;
    ​
    /**
     * @author JLS
     * @description:
     * @since 2023-03-22 19:09
     */
    @RestController
    @RequestMapping("/test")
    public class TestController extends BaseController {
    ​
        @GetMapping("/test")
        @ResponseBody
        // @RateLimit(resourceName = "test", initialCapacity = 10, refillRate = 2, refillTimeUnit =
        // TimeUnit.SECONDS)
        @RateLimitNew(resourceName = "test1", permits = 1, restoreRate = 10)
        public JlsTestResponse test() {
            return sendSuccessData("success");
        }
    ​
    }
    ​
    

    其他的一些配置代码,同redis的实现,就不再重复写出,

    每个服务实例可以独立管理自己的限流器,但令牌桶的状态和数据是存储在 Redis 中的,这意味着所有实例共享相同的令牌桶信息。当一个实例获取或更新令牌桶的状态时,其他实例也可以立即感知到这些变化,从而实现在分布式系统中的限流效果。

    结合 Redisson 的 RRateLimiter 和 Redis 缓存,可以实现分布式系统的限流,确保系统稳定性和资源的合理利用。

    四、现成的工具-sentinel实现

    Sentinel是阿里巴巴开源的一款分布式系统的流量控制组件,用于保护后端服务免受过多请求的影响,确保系统的稳定性和可靠性。Sentinel提供了多种限流策略和流量控制规则,能够灵活地适应不同场景的需求。

    以下是Sentinel组件限流的基本使用步骤:

  • 引入依赖:首先,需要将Sentinel的依赖添加到项目中。可以通过Maven或Gradle引入相关的Sentinel依赖。
  • 配置流控规则:在项目的配置文件中,配置需要的流控规则。流控规则可以通过配置文件、注解或代码进行配置。
  • 初始化Sentinel:在应用程序启动时,需要初始化Sentinel组件,确保其正常工作。
  • 设置限流策略:选择适合场景的限流策略,例如,根据QPS、线程数、并发数等设置限流策略。
  • 编写业务逻辑:在业务逻辑中,使用Sentinel的限流功能来保护对资源的访问。当达到限流阈值时,Sentinel会根据配置的限流策略进行处理,例如拒绝请求、等待或者返回默认值等。
  • 监控和统计:Sentinel提供了丰富的监控和统计功能,可以通过Dashboard查看实时的流量控制情况和系统状态,帮助及时发现问题并进行调整。
  • 使用Sentinel进行限流的示例代码(基于Spring Boot):

  • 引入Sentinel依赖:
  • Maven:

    xmlCopy code
        com.alibaba.csp
        sentinel-core
        1.8.1
    
    
  • 在配置文件中配置限流规则(可以根据实际需求配置):
  • yamlCopy codespring:
      cloud:
        sentinel:
          transport:
            dashboard: localhost:8080 # Sentinel Dashboard 地址
    
  • 在启动类中初始化Sentinel组件:
  • javaCopy codeimport org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    ​
    @SpringBootApplication
    public class Application {
    ​
        public static void main(String[] args) {
            // 初始化 Sentinel
            initSentinel();
            SpringApplication.run(Application.class, args);
        }
    ​
        private static void initSentinel() {
            // 这里的参数可以根据实际情况进行调整
            System.setProperty("csp.sentinel.api.port", "8720");
            System.setProperty("csp.sentinel.dashboard.server", "localhost:8080");
            System.setProperty("project.name", "your-project-name");
        }
    }
    
  • 在业务代码中添加限流注解:
  • javaCopy codeimport com.alibaba.csp.sentinel.annotation.SentinelResource;
    ​
    @Service
    public class YourService {
    ​
        @SentinelResource(value = "yourResourceName", blockHandler = "blockHandlerMethod")
        public void yourMethod() {
            // 业务逻辑
        }
    ​
        // 定义限流策略的处理方法
        public void blockHandlerMethod(BlockException ex) {
            // 限流处理逻辑
        }
    }
    

    上述示例代码中,我们使用了Sentinel的注解@SentinelResource来标注需要进行限流保护的方法。当达到限流阈值时,会调用blockHandler指定的方法进行限流处理。在blockHandlerMethod中,可以自定义限流策略的处理逻辑。

    需要注意的是,Sentinel的流控规则可以在Dashboard中进行配置和管理,也可以通过代码进行动态配置,使得限流策略可以根据实际情况进行灵活调整。同时,Dashboard提供了实时的监控和统计功能,方便查看应用程序的流量控制情况和系统状态。

    sentinel虽然好 ,但是这个组件所包含的东西过大,有些时候只需要用到限流功能,则会显得有点大材小用,没有必要。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论