项目中需要对 api 的接口进行限流,但是麻烦的是,api 可能有多个节点,传统的本地限流无法处理这个问题。限流的算法有很多,比如计数器法,漏斗法,令牌桶法,等等。各有利弊,相关博文网上很多,这里不再赘述。
项目的要求主要有以下几点:
- 支持本地/分布式限流,接口统一
- 支持多种限流算法的切换
- 方便配置,配置方式不确定
go 语言不是很支持 OOP,我在实现的时候是按 Java 的思路走的,所以看起来有点不伦不类,希望能抛砖引玉。
1. 接口定义
package ratelimit
import "time"
// 限流器接口
type Limiter interface {
Acquire() error
TryAcquire() bool
}
// 限流定义接口
type Limit interface {
Name() string
Key() string
Period() time.Duration
Count() int32
LimitType() LimitType
}
// 支持 burst
type BurstLimit interface {
Limit
BurstCount() int32
}
// 分布式定义的 burst
type DistLimit interface {
Limit
ClusterNum() int32
}
type LimitType int32
const (
CUSTOM LimitType = iota
IP
)
Limiter 接口参考了 Google 的 guava 包里的 Limiter 实现。Acquire 接口是阻塞接口,其实还需要加上 context 来保证调用链安全,因为实际项目中并没有用到 Acquire 接口,所以没有实现完善;同理,超时时间的支持也可以通过添加新接口继承自 Limiter 接口来实现。TryAcquire 会立即返回。
Limit 抽象了一个限流定义,Key() 方法返回这个 Limit 的唯一标识,Name() 仅作辅助,Period() 表示周期,单位是秒,Count() 表示周期内的最大次数,LimitType()表示根据什么来做区分,如 IP,默认是 CUSTOM.
BurstLimit 提供突发的能力,一般是配合令牌桶算法。DistLimit 新增 ClusterNum() 方法,因为 mentor 要求分布式遇到错误的时候,需要退化为单机版本,退化的策略即是:2 节点总共 100QPS,如果出现分区,每个节点需要调整为各 50QPS
2. LocalCounterLimiter
package ratelimit
import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
)
// todo timer 需要 stop
type localCounterLimiter struct {
limit Limit
limitCount int32 // 内部使用,对 limit.count 做了 lim.limitCount {
return false
}
return true
//keys := []string{lim.limit.Key()}
//
//log.Errorw("TryAcquire ", keys, lim.limit.Count(), lim.limit.Period().Seconds())
//count, err := lim.redisClient.Eval(lim.script, keys, lim.limit.Count(), lim.limit.Period().Seconds())
//if err != nil {
// log.Errorw("TryAcquire error", common.ERR, err)
// return false
//}
//
//
//typeName := reflect.TypeOf(count).Name()
//log.Errorw(typeName)
//
//if count != nil && count.(int32) = requested
local new_tokens = filled_tokens
if allowed then
new_tokens = filled_tokens - requested
end
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed, new_tokens }