四种常见分布式限流算法实现!

2023年 7月 14日 87.9k 0

大家好,我是老三,最近公司在搞年终大促,随着各种营销活动“组合拳”打出,进站流量时不时会有一个小波峰,一般情况下,当然是流量越多越好,前提是系统能杠地住。大家都知道,一个分布式系统,有两个“弃车保帅”的策略:限流和熔断,这期,我们就来讨论一下分布式系统的限流。

探探限流

带着问题走近限流

为什么要限流呢?

就像我上面说的,流量多,的确是一件好事,但是如果过载,把系统打挂了,那大家都要吃席了。

没逝吧

所以,在各种大促活动之前,要对系统进行压测,评估整个系统的峰值QPS,要做一些限流的设置,超过一定阈值,就拒绝处理或者延后处理,避免把系统打挂的情况出现。

限流和熔断有什么区别?

限流发生在流量进来之前,超过的流量进行限制。

熔断是一种应对故障的机制,发生在流量进来之后,如果系统发生故障或者异常,熔断会自动切断请求,防止故障进一步扩展,导致服务雪崩。

限流和削峰有什么区别?

削峰是对流量的平滑处理,通过缓慢地增加请求的处理速率来避免系统瞬时过载。

削峰大概就是水库,把流量储存起来,慢慢流,限流大概就是闸口,拒绝超出的流量。

限流的通用流程

那么具体限流怎么实现呢?可以概括为以下几个步骤:

限流通用流程

  • 统计请求流量:记录请求的数量或速率,可以通过计数器、滑动窗口等方式进行统计。
  • 判断是否超过限制:根据设定的限制条件,判断当前请求流量是否超过限制。
  • 执行限流策略:如果请求流量超过限制,执行限流策略,如拒绝请求、延迟处理、返回错误信息等。
  • 更新统计信息:根据请求的处理结果,更新统计信息,如增加计数器的值、更新滑动窗口的数据等。
  • 重复执行以上步骤:不断地统计请求流量、判断是否超过限制、执行限流策略、更新统计信息
  • 需要注意的是,具体的限流算法实现可能会根据不同的场景和需求进行调整和优化,比如使用令牌桶算法、漏桶算法等。

    单机限流和分布式限流

    我们注意到,在限流的通用流程里,需要统计请求量、更新统计量,那么这个请求量的统计和更新就必须维护在一个存储里。

    假如只是一个单机版的环境,那就很好办了,直接储存到本地。

    单机vs集群

    但是一般来讲,我们的服务都是集群部署的,如何来实现多台机器之间整体的限流呢?

    这时候就可以把我们的统计信息放到Tair或Redis等分布式的K-V存储中。

    四种限流算法与分布式实现

    接下来,我们开始实现一些常见的限流算法,这里使用Redis作为分布式存储,Redis不用多说了吧,最流行的分布式缓存DB;Redission作为Redis客户端,Redission单纯只是用来做分布式锁,有些”屈才“,其实用来作为Redis的客户端也非常好用。

    五种限流算法分布式实现

    在开始之前,我们先简单准备一下环境,Redis安装和项目创建就不多说了。

    • 添加依赖
            
                org.redisson
                redisson
                3.16.2
            
    
    • 用单例模式获取RedissonClient,这里就不注册成bean了,跑单测太慢
    public class RedissonConfig {
    
        private static final String REDIS_ADDRESS = "redis://127.0.0.1:6379";
    
        private static volatile  RedissonClient redissonClient;
    
       public static RedissonClient getInstance(){
            if (redissonClient==null){
                synchronized (RedissonConfig.class){
                    if (redissonClient==null){
                        Config config = new Config();
                        config.useSingleServer().setAddress(REDIS_ADDRESS);
                        redissonClient = Redisson.create(config);
                        return redissonClient;
                    }
                }
            }
            return redissonClient;
        }
    }
    

    固定窗口限流算法

    算法原理

    固定窗口算法,很多参考资料也称之为计数器算法,当然我个人理解,计数器算法是固定窗口算法的一种特例,当然我们不纠结那么多。

    固定窗口算法,是一种比较简单的限流算法,它把时间划分为固定的时间窗口,每个窗口内允许的请求次数设置限制。如果在一个时间窗口内,请求次数超过了上限,那么就会触发限流。

    在这里插入图片描述

    算法实现

    基于Redisson的实现固定窗口相当简单。在每个窗口期内,我们可以通过incrementAndGet操作来统计请求的数量。一旦窗口期结束,我们可以利用Redis的键过期功能来自动重置计数。

    • 来看下代码实现:
    public class FixedWindowRateLimiter {
        public static final String KEY = "fixedWindowRateLimiter:";
        /**
         * 请求限制数量
         */
        private Long limit;
        /**
         * 窗口大小(单位:S)
         */
        private Long windowSize;
    
        public FixedWindowRateLimiter(Long limit, Long windowSize) {
            this.limit = limit;
            this.windowSize = windowSize;
        }
    
        /**
         * 固定窗口限流
         */
        public boolean triggerLimit(String path) {
            RedissonClient redissonClient = RedissonConfig.getInstance();
            //加分布式锁,防止并发情况下窗口初始化时间不一致问题
            RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
            try {
                rLock.lock(100, TimeUnit.MILLISECONDS);
                String redisKey = KEY + path;
                RAtomicLong counter = redissonClient.getAtomicLong(redisKey);
                //计数
                long count = counter.incrementAndGet();
                //如果为1的话,就说明窗口刚初始化
                if (count == 1) {
                    //直接设置过期时间,作为窗口
                    counter.expire(windowSize, TimeUnit.SECONDS);
                }
                //触发限流
                if (count > limit) {
                    //触发限流的不记在请求数量中
                    counter.decrementAndGet();
                    return true;
                }
                return false;
            } finally {
                rLock.unlock();
            }
        }
    
    }
    

    这里还额外用了一个分布式锁,来解决并发情况下,窗口的初始化问题。

    • 再来测试一下
    class FixedWindowRateLimiterTest {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque(10));
    
        @Test
        @DisplayName("1min限制10次请求固定窗口测试")
        void triggerLimit() throws InterruptedException {
            FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(10L,60L);
            //模拟不同窗口内的调用
            for (int i = 0; i < 3; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(20);
                //20个线程并发调用
                for (int j = 0; j  {
                        boolean isLimit = fixedWindowRateLimiter.triggerLimit("/test");
                        System.out.println(isLimit);
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                //休眠1min
                TimeUnit.MINUTES.sleep(1);
            }
        }
    }
    

    当然大家也可以写个接口,用Jmeter之类的压测工具来进行测试。

    固定窗口算法的优点是实现简单,占用空间小,但是它存在临界问题,由于窗口的切换是瞬间完成的,因此请求的处理并不平滑,可能会在窗口切换的瞬间出现流量的剧烈波动。

    比如这个例子,假如在00:02,突然有大量请求过来,但是我们这时候计数重置了,那么就没法限制突发的这些流量。

    临界值问题

    滑动窗口算法

    为了缓解固定窗口的突发流量问题,可以采用滑动窗口算法,计算机网络中TCP的流量控制就是采用滑动窗口算法。

    算法原理

    滑动窗口限流算法的原理是将一个大的时间窗口划分为多个小的时间窗口,每个小的窗口都有独立的计数。

    请求过来的时候,判断请求的次数是否超过整个窗口的限制。窗口的移动是每次向前滑动一个小的单元窗口。

    例如下面这个滑动窗口,将大时间窗口1min分成了5个小窗口,每个小窗口的时间是12s。

    每个单元格有自己独立的计数器,每过12s就会向前移动一格。

    假如有请求在00:01的时候过来,这时候窗口的计数就是3+12+9+15=39,也能起到限流的作用。

    滑动窗口算法示意图

    这就是为什么滑动窗口能解决临界问题,滑的格子越多,那么整体的滑动就会越平滑,限流的效果就会越精准。

    算法实现

    那么我们这里怎么实现滑动窗口限流算法呢?非常简单,我们可以直接使用Redis的有序集合(zset)结构。

    我们使用时间戳作为score和member,有请求过来的时候,就把当前时间戳添加到有序集合里。那么窗口之外的请求,我们可以根据窗口大小,计算出起始时间戳,删除窗口外的请求。这样,有序集合的大小,就是我们这个窗口的请求数了。

    zset实现滑动窗口

    • 代码实现
    public class SlidingWindowRateLimiter {
        public static final String KEY = "slidingWindowRateLimiter:";
    
        /**
         * 请求次数限制
         */
        private Long limit;
        /**
         * 窗口大小(单位:S)
         */
        private Long windowSize;
    
        public SlidingWindowRateLimiter(Long limit, Long windowSize) {
            this.limit = limit;
            this.windowSize = windowSize;
        }
    
    
        public boolean triggerLimit(String path) {
            RedissonClient redissonClient = RedissonConfig.getInstance();
            //窗口计数
            RScoredSortedSet counter = redissonClient.getScoredSortedSet(KEY + path);
            //使用分布式锁,避免并发设置初始值的时候,导致窗口计数被覆盖
            RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
            try {
                rLock.lock(200, TimeUnit.MILLISECONDS);
                // 当前时间戳
                long currentTimestamp = System.currentTimeMillis();
                // 窗口起始时间戳
                long windowStartTimestamp = currentTimestamp - windowSize * 1000;
                // 移除窗口外的时间戳,左闭右开
                counter.removeRangeByScore(0, true, windowStartTimestamp, false);
                // 将当前时间戳作为score,也作为member,
                // TODO:高并发情况下可能没法保证唯一,可以加一个唯一标识
                counter.add(currentTimestamp, currentTimestamp);
                //使用zset的元素个数,作为请求计数
                long count = counter.size();
                // 判断时间戳数量是否超过限流阈值
                if (count > limit) {
                    System.out.println("[triggerLimit] path:" + path + " count:" + count + " over limit:" + limit);
                    return true;
                }
                return false;
            } finally {
                rLock.unlock();
            }
        }
    
    }
    
    

    这里还有一个小的可以完善的点,zset在member相同的情况下,是会覆盖的,也就是说高并发情况下,时间戳可能会重复,那么就有可能统计的请求偏少,这里可以用时间戳+随机数来缓解,也可以生成唯一序列来解决,比如UUID、雪花算法等等。

    • 还是来测试一下
    class SlidingWindowRateLimiterTest {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque(10));
    
        @Test
        @DisplayName("滑动窗口限流")
        void triggerLimit() throws InterruptedException {
            SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(10L, 1L);
            //模拟在不同时间片内的请求
            for (int i = 0; i < 8; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(20);
                for (int j = 0; j  {
                        boolean isLimit = slidingWindowRateLimiter.triggerLimit("/test");
                        System.out.println(isLimit);
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                //休眠10s
                TimeUnit.SECONDS.sleep(10L);
            }
        }
    }
    

    用Redis实现了滑动窗口限流,解决了固定窗口限流的边界问题,当然这里也带来了新的问题,因为我们存储了窗口期的所有请求,所以高并发的情况下,可能会比较占内存。

    漏桶算法

    我们可以看到,计数器类的限流,体现的是一个“戛然而止”,超过限制,立马决绝,但是有时候,我们可能只是希望请求平滑一些,追求的是“波澜不惊”,这时候就可以考虑使用其它的限流算法。

    算法原理

    漏桶算法(Leaky Bucket),名副其实,就是请求就像水一样以任意速度注入漏桶,而桶会按照固定的速率将水漏掉。

    漏桶算法

    当进水速率大于出水速率的时候,漏桶会变满,此时新进入的请求将会被丢弃。

    漏桶算法的两大作用是网络流量整形(Traffic Shaping)和速度限制(Rate Limiting)。

    算法实现

    我们接着看看具体应该怎么实现。

    在滑动窗口限流算法里我们用到了RScoredSortedSet,非常好用对不对,这里也可以用这个结构,直接使用ZREMRANGEBYSCORE命令来删除旧的请求。

    进水就不用多说了,请求进来,判断桶有没有满,满了就拒绝,没满就往桶里丢请求。

    那么出水怎么办呢?得保证稳定速率出水,可以用一个定时任务,来定时去删除旧的请求。

    • 代码实现
    public class LeakyBucketRateLimiter {
        private RedissonClient redissonClient = RedissonConfig.getInstance();
        private static final String KEY_PREFIX = "LeakyBucket:";
    
        /**
         * 桶的大小
         */
        private Long bucketSize;
        /**
         * 漏水速率,单位:个/秒
         */
        private Long leakRate;
    
    
        public LeakyBucketRateLimiter(Long bucketSize, Long leakRate) {
            this.bucketSize = bucketSize;
            this.leakRate = leakRate;
            //这里启动一个定时任务,每s执行一次
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
            executorService.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS);
        }
    
        /**
         * 漏水
         */
        public void leakWater() {
            RSet pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
            //遍历所有path,删除旧请求
            for(String path:pathSet){
                String redisKey = KEY_PREFIX + path;
                RScoredSortedSet bucket = redissonClient.getScoredSortedSet(KEY_PREFIX + path);
                // 获取当前时间
                long now = System.currentTimeMillis();
                // 删除旧的请求
                bucket.removeRangeByScore(0, true,now - 1000 * leakRate,true);
            }
        }
    
        /**
         * 限流
         */
        public boolean triggerLimit(String path) {
            //加锁,防止并发初始化问题
            RLock rLock = redissonClient.getLock(KEY_PREFIX + "LOCK:" + path);
            try {
                rLock.lock(100,TimeUnit.MILLISECONDS);
                String redisKey = KEY_PREFIX + path;
                RScoredSortedSet bucket = redissonClient.getScoredSortedSet(redisKey);
                //这里用一个set,来存储所有path
                RSet pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
                pathSet.add(path);
                // 获取当前时间
                long now = System.currentTimeMillis();
                // 检查桶是否已满
                if (bucket.size() < bucketSize) {
                    // 桶未满,添加一个元素到桶中
                    bucket.add(now,now);
                    return false;
                }
                // 桶已满,触发限流
                System.out.println("[triggerLimit] path:"+path+" bucket size:"+bucket.size());
                return true;
            }finally {
                rLock.unlock();
            }
        }
        
    }
    
    

    在代码实现里,我们用了RSet来存储path,这样一来,一个定时任务,就可以搞定所有path对应的桶的出水,而不用每个桶都创建一个一个定时任务。

    这里我直接用ScheduledExecutorService启动了一个定时任务,1s跑一次,当然集群环境下,每台机器都跑一个定时任务,对性能是极大的浪费,而且不好管理,我们可以用分布式定时任务,比如xxl-job去执行leakWater

    • 最后还是大家熟悉的测试
    class LeakyBucketRateLimiterTest {
    
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque(10));
    
        @Test
        @DisplayName("漏桶算法")
        void triggerLimit() throws InterruptedException {
            LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(10L, 1L);
            for (int i = 0; i < 8; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(20);
                for (int j = 0; j  {
                        boolean isLimit = leakyBucketRateLimiter.triggerLimit("/test");
                        System.out.println(isLimit);
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                //休眠10s
                TimeUnit.SECONDS.sleep(10L);
            }
        }
    }
    

    漏桶算法能够有效防止网络拥塞,实现也比较简单。

    但是,因为漏桶的出水速率是固定的,假如突然来了大量的请求,那么只能丢弃超量的请求,即使下游能处理更大的流量,没法充分利用系统资源

    令牌桶算法

    令牌桶算法来了!

    算法原理

    令牌桶算法是对漏桶算法的一种改进。

    它的主要思想是:系统以一种固定的速率向桶中添加令牌,每个请求在发送前都需要从桶中取出一个令牌,只有取到令牌的请求才被通过。因此,令牌桶算法允许请求以任意速率发送,只要桶中有足够的令牌。

    令牌桶算法

    算法实现

    我们继续看怎么实现,首先是要发放令牌,要固定速率,那我们又得开个线程,定时往桶里投令牌,然后……

    ——然后Redission提供了令牌桶算法的实现,舒不舒服?

    拿来吧你

    拿来就用!

    • 代码实现
    public class TokenBucketRateLimiter {
    
        public static final String KEY = "TokenBucketRateLimiter:";
    
        /**
         * 阈值
         */
        private Long limit;
        /**
         * 添加令牌的速率,单位:个/秒
         */
        private Long tokenRate;
    
        public TokenBucketRateLimiter(Long limit, Long tokenRate) {
            this.limit = limit;
            this.tokenRate = tokenRate;
        }
    
        /**
         * 限流算法
         */
        public boolean triggerLimit(String path){
            RedissonClient redissonClient=RedissonConfig.getInstance();
            RRateLimiter rateLimiter = redissonClient.getRateLimiter(KEY+path);
            // 初始化,设置速率模式,速率,间隔,间隔单位
            rateLimiter.trySetRate(RateType.OVERALL, limit, tokenRate, RateIntervalUnit.SECONDS);
            // 获取令牌
            return rateLimiter.tryAcquire();
        }
    }
    

    Redisson实现的,还是比较稳的,这里就不测试了。

    关于Redission是怎么实现这个限速器的,大家可以看一下参考[3],还是Redisson家的老传统——Lua脚本,设计相当巧妙。

    总结

    在这篇文章里,我们对四(三)种限流算法进行了分布式实现,采用了非常好用的Redission客户端,当然我们也有不完善的地方:

    • 并发处理采用了分布式锁,高并发情况下,对性能有一定损耗,逻辑最好还是直接采用Lua脚本实现,来提高性能
    • 可以提供更加优雅的调用方式,比如利用aop实现注解式调用,代码设计也可以更加优雅,继承体系可以完善一下
    • 没有实现限流的拒绝策略,比如抛异常、缓存、丢进MQ打散……限流是一种方法,最终的目的还是尽可能保证系统平稳

    如果后面有机会,希望可以继续完善这个简单的Demo,达到工程级的应用。

    除此之外,市面上也有很多好用的开源限流工具:

    • Guava RateLimiter ,基于令牌桶算法限流,当然是单机的;
    • Sentinel ,基于滑动窗口限流,支持单机,也支持集群
    • 网关限流,很多网关自带限流方法,比如Spring Cloud GatewayNginx

    ……

    好了,这期文章就到这里了,我们下期见。

    参考:

    [1]. 面试官:来,年轻人!请手撸5种常见限流算法!

    [2].zhuanlan.zhihu.com/p/479956069

    [3].github.com/oneone1995/…

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论