redis10万字超详细讲解

2023年 10月 14日 228.0k 0

redis原理

1.基于事件模型

redis本身是个事件驱动程序,通过监听文件事件和时间事件来完成相应的功能。其中文件事件其实就是对socket的抽象,把一个个socket事件抽象成文件事件.

2.通信

基于socket的TCP三次握手,发送数据到socket缓冲区,等待系统从socket缓冲区获取数据,然后通过网卡把数据发送出去,接收方收到网卡数据后,会把数据copy到socket,然后等待应用程序来取

因为涉及到系统调用,整个过程可以发现一份数据需要先从用户态拷贝到内核态的socket,然后又要从内核态的socket拷贝到用户态的进程中去,这就是数据拷贝的开销。

2.1数据怎么知道发给哪个socket

内核维护的socket那么多,网卡过来的数据怎么知道投递给哪个socket?答案是端口,socket是一个四元组:ip(client)+ port(client)+ip(server)+port(server)(注意千万不要说一台机器的理论最大并发是65535个,除了端口,还有ip,应该是端口数*ip数),这也是为什么一台电脑可以同时打开多个软件的原因。

2.2 socket的数据怎么通知程序来取

当数据已经从网卡copy到了对应的socket缓冲区中,怎么通知程序来取?假如socket数据还没到达,这时程序在干嘛?这里其实涉及到cpu对进程的调度的问题。从cpu的角度来看,进程存在运行态、就绪态、阻塞态。

  • 就绪态:进程等待被执行,资源都已经准备好了,剩下的就等待cpu的调度了。
  • 运行态:正在运行的进程,cpu正在调度的进程。
  • 阻塞态:因为某些情况导致阻塞,不占有cpu,正在等待某些事件的完成。

当存在多个运行态的进程时,由于cpu的时间片技术,运行态的进程都会被cpu执行一段时间,看着好似同时运行一样,这就是所谓的并发。当我们创建一个socket连接时,它大概会这样

sockfd = socket(AF_INET, SOCK_STREAM, 0)

connect(sockfd, ....)

recv(sockfd, ...)

doSometing()

操作系统会为每个socket建立一个fd句柄,这个fd就指向我们创建的socket对象,这个对象包含缓冲区、进程的等待队列...。对于一个创建socket的进程来说,如果数据没到达,那么他会卡在recv处,这个进程会挂在socket对象的等待队列中,对cpu来说,这个进程就是阻塞的,它其实不占有cpu,它在等待数据的到来。

当数据到来时,网卡会告诉cpu,cpu执行中断程序,把网卡的数据copy到对应的socket的缓冲区中,然后唤醒等待队列中的进程,把这个进程重新放回运行队列中,当这个进程被cpu运行的时候,它就可以执行最后的读取操作了。这种模式有两个问题:

  • recv只能接收一个fd,如果要recv多个fd怎么办?通过while循环效率稍低。
  • 进程除了读取数据,还要处理接下里的逻辑,在数据没到达时,进程处于阻塞态,即使用了while循环来监听多个fd,其它的socket是不是因为其中一个recv阻塞,而导致整个进程的阻塞。
  • 3.Reactor

    Reactor是一种高性能处理IO的模式,Reactor模式下主程序只负责监听文件描述符上是否有事件发生,这一点很重要,主程序并不处理文件描述符的读写。那么文件描述符的可读可写谁来做?答案是其他的工作程序,当某个socket发生可读可写的事件后,主程序会通知工作程序,真正从socket里面读取数据和写入数据的是工作程序。这种模式的好处就是就是主程序可以扛并发,不阻塞,主程序非常的轻便。事件可以通过队列的方式等待被工作程序执行。通过Reactor模式,我们只需要把事件和事件对应的handler(callback func),注册到Reactor中就行了,比如:s

    type Reactor interface{
    
       RegisterHandler(WriteCallback func(), "writeEvent");
    
       RegisterHandler(ReadCallback func(), "readEvent");
    
    }
    

    当一个客户端向redis发起set key value的命令,这时候会向socket缓冲区写入这样的命令请求,当Reactor监听到对应的socket缓冲区有数据了,那么此时的socket是可读的,Reactor就会触发读事件,通过事先注入的ReadCallback回调函数来完成命令的解析、命令的执行。当socket的缓冲区有足够的空间可以被写,那么对应的Reactor就会产生可写事件,此时就会执行事先注入的WriteCallback回调函数。当发起的set key value执行完毕后,此时工作程序会向socket缓冲区中写入OK,最后客户端会从socket缓冲区中取走写入的OK。在redis中不管是ReadCallback,还是WriteCallback,它们都是一个线程完成的,如果它们同时到达那么也得排队,这就是redis6.0之前的默认模式,也是最广为流传的单线程redis。

    整个流程下来可以发现Reactor主程序非常快,因为它不需要执行真正的读写,剩下的都是工作程序干的事:IO的读写、命令的解析、命令的执行、结果的返回..,这一点很重要

    4.基于多路复用的高性能I/O模型

    Linux的IO多路复用机制是指一个线程处理多个IO流,也就是select/epoll机制。

    在Redis运行单线程下,该机制允许内核中,同时存在多个监听套接字和已连接套接字

    为了在请求到达时能通知到Redis线程,select/epoll提供了基于事件的回调机制,即针对不同事件的发生,调用相应的处理函数。

    回调机制的工作流程:

  • select/epoll一旦临听到FD上有请求到达,就会触发相应的事件,并放进一个事件队列中。
  • Redis单线程对事件队列进行处理即可,无需一直轮询是否有请求发生,避免CPU资源浪费。
  • 因为Redis一直在对事件队列进行处理,所以能及时响应客户端请求,提升Redis的响应性能。

    不过,需要注意的是,在不同的操作系统上,多路复用机制也是适用的

    在“Redis基本IO模型”图中,有哪些潜在的性能瓶颈?

    Redis单线程处理IO请求性能瓶颈主要包括2个方面:

    1、任意一个请求在server中一旦发生耗时,都会影响整个server的性能 也就是说后面的请求都要等前面这个耗时请求处理完成,自己才能被处理到。

    耗时的操作包括:

    • 操作bigkey:写入一个bigkey在分配内存时需要消耗更多的时间,同样,删除bigkey释放内存同样会产生耗时
    • 使用复杂度过高的命令:例如SORT/SUNION/ZUNIONSTORE,或者O(N)命令,但是N很大,例如lrange key 0 -1一次查询全量数据
    • 大量key集中过期:Redis的过期机制也是在主线程中执行的,大量key集中过期会导致处理一个请求时,耗时都在删除过期key,耗时变长
    • 淘汰策略:溜达策略也是在主线程执行的,当内存超过Redis内存上限后,每次写入都需要淘汰一些key,也会 造成耗时变长。
    • AOF刷盘开启always机制:每次写入都需要把这个操作刷到磁盘,写磁盘的速度远比写内存慢,会拖慢Redis的性能
    • 主从全量同步生成RDB:虽然采用fork子进程生成数据快照,但fork这一瞬间也是会阻塞整个线程的,实例越大,阻塞时间越久

    解决办法:

    • 需要业务人员去规避
    • Redis在4.0推出了lazy-free机制,把bigkey释放内存的耗时操作放在了异步线程中执行,降低对主线程的影响

    2、并发量非常大时,单线程读写客户端IO数据存在性能瓶颈,虽然采用IO多路复用机制,但是读写客户端数据依旧是同步IO,只能单线程依次读取客户端的数据,无法利用到CPU多核。

    解决办法:

    • Redis在6.0推出了多线程,可以在高并发场景下利用CPU多核多线程读写客户端数据,进一步提升server性能
    • 当然,只针对客户端的读写是并行的,每个命令的真正操作依旧是单线程的

    redis常用数据类型

    1.string

    string表示的是一个可变的字节数组,我们初始化字符串的内容、可以拿到字符串的长度,可以获取string的子串,可以覆盖string的子串内容,可以追加子串。

    Redis的字符串是动态字符串,是可以修改的字符串,内部结构实现上类似于Java的ArrayList, 采用预分配冗余空间的方式来减少内存的频繁分配,如图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M。

    初始化字符串 需要提供「变量名称」和「变量的内容」

    > set ireader beijing.zhangyue.keji.gufen.youxian.gongsi
    OK
    

    获取字符串的内容 提供「变量名称」

    > get ireader
    "beijing.zhangyue.keji.gufen.youxian.gongsi"
    

    获取字符串的长度 提供「变量名称」

    > strlen ireader
    (integer) 42
    

    获取子串 提供「变量名称」以及开始和结束位置[start, end]

    > getrange ireader 28 34
    "youxian"
    

    覆盖子串 提供「变量名称」以及开始位置和目标子串

    > setrange ireader 28 wooxian
    (integer) 42  # 返回长度
    > get ireader
    "beijing.zhangyue.keji.gufen.wooxian.gongsi"
    

    追加子串

    > append ireader .hao
    (integer) 46 # 返回长度
    > get ireader
    "beijing.zhangyue.keji.gufen.wooxian.gongsi.hao"
    

    计数器 如果字符串的内容是一个整数,那么还可以将字符串当成计数器来使用

    > set ireader 42
    
    OK
    
    > get ireader
    
    "42"
    
    > incrby ireader 100
    
    (integer) 142
    
    > get ireader
    
    "142"
    
    > decrby ireader 100
    
    (integer) 42
    
    > get ireader
    
    "42"
    
    > incr ireader  # 等价于incrby ireader 1
    
    (integer) 43
    
    > decr ireader  # 等价于decrby ireader 1
    
    (integer) 42
    

    计数器是有范围的,它不能超过Long.Max,不能低于Long.MIN

    > set ireader 9223372036854775807
    
    OK
    
    > incr ireader
    
    (error) ERR increment or decrement would overflow
    
    > set ireader -9223372036854775808
    
    OK
    
    > decr ireader
    
    (error) ERR increment or decrement would overflow
    

    过期和删除 字符串可以使用del指令进行主动删除,可以使用expire指令设置过期时间,到点会自动删除,这属于被动删除。可以使用ttl指令获取字符串的寿命。

    > expire ireader 60
    
    (integer) 1  # 1表示设置成功,0表示变量ireader不存在
    
    > ttl ireader
    
    (integer) 50  # 还有50秒的寿命,返回-2表示变量不存在,-1表示没有设置过期时间
    
    > del ireader
    
    (integer) 1  # 删除成功返回1
    
    > get ireader
    
    (nil)  # 变量ireader没有了
    

    2.hash

    哈希等价于Java语言的HashMap或者是Python语言的dict,在实现结构上它使用二维结构,第一维是数组,第二维是链表,hash的内容key和value存放在链表中,数组里存放的是链表的头指针。通过key查找元素时,先计算key的hashcode,然后用hashcode对数组的长度进行取模定位到链表的表头,再对链表进行遍历获取到相应的value值,链表的作用就是用来将产生了「hash碰撞」的元素串起来

    获取元素 可以通过hget定位具体key对应的value,可以通过hmget获取多个key对应的value,可以使用hgetall获取所有的键值对,可以使用hkeys和hvals分别获取所有的key列表和value列表。这些操作和Java语言的Map接口是类似的。

    > hmset ireader go fast java fast python slow
    
    OK
    
    > hget ireader go
    
    "fast"
    
    > hmget ireader go python
    
    1) "fast"
    
    2) "slow"
    
    > hgetall ireader
    
    1) "go"
    
    2) "fast"
    
    3) "java"
    
    4) "fast"
    
    5) "python"
    
    6) "slow"
    
    > hkeys ireader
    
    1) "go"
    
    2) "java"
    
    3) "python"
    
    > hvals ireader
    
    1) "fast"
    
    2) "fast"
    
    3) "slow"
    

    删除元素 可以使用hdel删除指定key,hdel支持同时删除多个key

    > hmset ireader go fast java fast python slow
    
    OK
    
    > hdel ireader go
    
    (integer) 1
    
    > hdel ireader java python
    
    (integer) 2
    

    判断元素是否存在 通常我们使用hget获得key对应的value是否为空就直到对应的元素是否存在了,不过如果value的字符串长度特别大,通过这种方式来判断元素存在与否就略显浪费,这时可以使用hexists指令。

    > hmset ireader go fast java fast python slow
    
    OK
    
    > hexists ireader go
    
    (integer) 1
    

    计数器 hash结构还可以当成计数器来使用,对于内部的每一个key都可以作为独立的计数器。如果value值不是整数,调用hincrby指令会出错。

    > hincrby ireader go 1
    
    (integer) 1
    
    > hincrby ireader python 4
    
    (integer) 4
    
    > hincrby ireader java 4
    
    (integer) 4
    
    > hgetall ireader
    
    1) "go"
    
    2) "1"
    
    3) "python"
    
    4) "4"
    
    5) "java"
    
    6) "4"
    
    > hset ireader rust good
    
    (integer) 1
    
    > hincrby ireader rust 1
    
    (error) ERR hash value is not an integer
    

    扩容 当hash内部的元素比较拥挤时(hash碰撞比较频繁),就需要进行扩容。扩容需要申请新的两倍大小的数组,然后将所有的键值对重新分配到新的数组下标对应的链表中(rehash)。如果hash结构很大,比如有上百万个键值对,那么一次完整rehash的过程就会耗时很长。这对于单线程的Redis里来说有点压力山大。所以Redis采用了渐进式rehash的方案。它会同时保留两个新旧hash结构,在后续的定时任务以及hash结构的读写指令中将旧结构的元素逐渐迁移到新的结构中。这样就可以避免因扩容导致的线程卡顿现象。

    缩容 Redis的hash结构不但有扩容还有缩容,从这一点出发,它要比Java的HashMap要厉害一些,Java的HashMap只有扩容。缩容的原理和扩容是一致的,只不过新的数组大小要比旧数组小一倍。

    3.Set

    象。Redis的set结构也是一样,它的内部也使用hash结构,所有的value都指向同一个内部值。

    增加元素 可以一次增加多个元素

    > sadd ireader go java python
    (integer) 3
    

    读取元素 使用smembers列出所有元素,使用scard获取集合长度,使用~~srandmember获取随机count个元素~~,如果不提供count参数,默认为1

    > sadd ireader go java python
    
    (integer) 3
    
    > smembers ireader
    
    1) "java"
    
    2) "python"
    
    3) "go"
    
    > scard ireader
    
    (integer) 3
    
    > srandmember ireader
    
    "java"
    

    删除元素 使用srem删除一到多个元素,使用spop删除随机一个元素

    > sadd ireader go java python rust erlang
    
    (integer) 5
    
    > srem ireader go java
    
    (integer) 2
    
    > spop ireader
    
    "erlang"
    

    判断元素是否存在 使用sismember指令,只能接收单个元素

    > sadd ireader go java python rust erlang
    
    (integer) 5
    
    > sismember ireader rust
    
    (integer) 1
    
    > sismember ireader javascript
    
    (integer) 0
    

    4.List

    Redis将列表数据结构命名为list而不是array,是因为列表的存储结构用的是链表而不是数组,而且链表还是双向链表。因为它是链表,所以随机定位性能较弱,首尾插入删除性能较优。如果list的列表长度很长,使用时我们一定要关注链表相关操作的时间复杂度。

    负下标 链表元素的位置使用自然数0,1,2,....n-1表示,还可以使用负数-1,-2,...-n来表示,-1表示「倒数第一」,-2表示「倒数第二」,那么-n就表示第一个元素,对应的下标为0

    队列/堆栈 链表可以从表头和表尾追加和移除元素,结合使用rpush/rpop/lpush/lpop四条指令,可以将链表作为队列或堆栈使用,左向右向进行都可以

    # 右进左出
    > rpush ireader go
    (integer) 1
    > rpush ireader java python
    (integer) 3
    > lpop ireader
    "go"
    > lpop ireader
    "java"
    > lpop ireader
    "python"
    # 左进右出
    > lpush ireader go java python
    (integer) 3
    > rpop ireader
    "go"
    ...
    # 右进右出
    > rpush ireader go java python
    (integer) 3
    > rpop ireader 
    "python"
    ...
    # 左进左出
    > lpush ireader go java python
    (integer) 3
    > lpop ireader
    "python"
    

    在日常应用中,列表常用来作为异步队列来使用。

    长度 使用llen指令获取链表长度

    > rpush ireader go java python
    
    (integer) 3
    
    > llen ireader
    
    (integer) 3
    

    随机读 可以使用lindex指令访问指定位置的元素,使用lrange指令来获取链表子元素列表,提供start和end下标参数

    > rpush ireader go java python
    
    (integer) 3
    
    > lindex ireader 1
    
    "java"
    
    > lrange ireader 0 2
    
    1) "go"
    
    2) "java"
    
    3) "python"
    
    > lrange ireader 0 -1  # -1表示倒数第一
    
    1) "go"
    
    2) "java"
    
    3) "python"
    

    修改元素 使用lset指令在指定位置修改元素。

    > rpush ireader go java python
    
    (integer) 3
    
    > lset ireader 1 javascript
    
    OK
    
    > lrange ireader 0 -1
    
    1) "go"
    
    2) "javascript"
    
    3) "python"
    

    插入元素 使用linsert指令在列表的中间位置插入元素,有经验的程序员都知道在插入元素时,我们经常搞不清楚是在指定位置的前面插入还是后面插入,所以antirez在linsert指令里增加了方向参数before/after来显示指示前置和后置插入。不过让人意想不到的是linsert指令并不是通过指定位置来插入,而是通过指定具体的值。这是因为在分布式环境下,列表的元素总是频繁变动的,意味着上一时刻计算的元素下标在下一时刻可能就不是你所期望的下标了。

    > rpush ireader go java python
    
    (integer) 3
    
    > linsert ireader before java ruby
    
    (integer) 4
    
    > lrange ireader 0 -1
    
    1) "go"
    
    2) "ruby"
    
    3) "java"
    
    4) "python"
    

    删除元素 列表的删除操作也不是通过指定下标来确定元素的,你需要指定删除的最大个数以及元素的值

    > rpush ireader go java python
    
    (integer) 3
    
    > lrem ireader 1 java
    
    (integer) 1
    
    > lrange ireader 0 -1
    
    1) "go"
    
    2) "python"
    

    定长列表 在实际应用场景中,我们有时候会遇到「定长列表」的需求。比如要以走马灯的形式实时显示中奖用户名列表,因为中奖用户实在太多,能显示的数量一般不超过100条,那么这里就会使用到定长列表。维持定长列表的指令是ltrim,需要提供两个参数start和end,表示需要保留列表的下标范围,范围之外的所有元素都将被移除。

    > rpush ireader go java python javascript ruby erlang rust cpp
    
    (integer) 8
    
    > ltrim ireader -3 -1
    
    OK
    
    > lrange ireader 0 -1
    
    1) "erlang"
    
    2) "rust"
    
    3) "cpp"
    

    如果指定参数的end对应的真实下标小于start,其效果等价于del指令,因为这样的参数表示需要需要保留列表元素的下标范围为空。

    5.sortedSet

    SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。

    zset底层实现使用了两个数据结构,第一个是hash,第二个是跳跃列表,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。跳跃列表的目的在于给元素value排序,根据score的范围获取元素列表。

    增加元素 通过zadd指令可以增加一到多个value/score对,score放在前面

    > zadd ireader 4.0 python
    
    (integer) 1
    
    > zadd ireader 4.0 java 1.0 go
    
    (integer) 2
    

    长度 通过指令zcard可以得到zset的元素个数

    > zcard ireader
    
    (integer) 3
    

    1

    2

    3

    4

    Plain Text

    删除元素 通过指令zrem可以删除zset中的元素,可以一次删除多个

    > zrem ireader go python
    
    (integer) 2
    

    计数器 同hash结构一样,zset也可以作为计数器使用。

    > zadd ireader 4.0 python 4.0 java 1.0 go
    
    (integer) 3
    
    > zincrby ireader 1.0 python
    
    "5"
    

    获取排名和分数 通过zscore指令获取指定元素的权重,通过zrank指令获取指定元素的正向排名,通过zrevrank指令获取指定元素的反向排名[倒数第一名]。正向是由小到大,负向是由大到小。

    > zscore ireader python
    
    "5"
    
    > zrank ireader go  # 分数低的排名考前,rank值小
    
    (integer) 0
    
    > zrank ireader java
    
    (integer) 1
    
    > zrank ireader python
    
    (integer) 2
    
    > zrevrank ireader python
    
    (integer) 0
    

    根据排名范围获取元素列表 通过zrange指令指定排名范围参数获取对应的元素列表,携带withscores参数可以一并获取元素的权重。通过zrevrange指令按负向排名获取元素列表[倒数]。正向是由小到大,负向是由大到小。

    > zrange ireader 0 -1  # 获取所有元素
    
    1) "go"
    
    2) "java"
    
    3) "python"
    
    > zrange ireader 0 -1 withscores
    
    1) "go"
    
    2) "1"
    
    3) "java"
    
    4) "4"
    
    5) "python"
    
    6) "5"
    
    > zrevrange ireader 0 -1 withscores
    
    1) "python"
    
    2) "5"
    
    3) "java"
    
    4) "4"
    
    5) "go"
    
    6) "1"
    

    根据score范围获取列表 通过zrangebyscore指令指定score范围获取对应的元素列表。通过zrevrangebyscore指令获取倒排元素列表。正向是由小到大,负向是由大到小。参数-inf表示负无穷,+inf表示正无穷。

    > zrangebyscore ireader 0 5
    
    1) "go"
    
    2) "java"
    
    3) "python"
    
    > zrangebyscore ireader -inf +inf withscores
    
    1) "go"
    
    2) "1"
    
    3) "java"
    
    4) "4"
    
    5) "python"
    
    6) "5"
    
    > zrevrangebyscore ireader +inf -inf withscores  # 注意正负反过来了
    
    1) "python"
    
    2) "5"
    
    3) "java"
    
    4) "4"
    
    5) "go"
    
    6) "1"
    

    根据范围移除元素列表 可以通过排名范围,也可以通过score范围来一次性移除多个元素

    > zremrangebyrank ireader 0 1
    
    (integer) 2  # 删掉了2个元素
    
    > zadd ireader 4.0 java 1.0 go
    
    (integer) 2
    
    > zremrangebyscore ireader -inf 4
    
    (integer) 2
    
    > zrange ireader 0 -1
    
    1) "python"
    

    跳跃列表 zset内部的排序功能是通过「跳跃列表」数据结构来实现的,它的结构非常特殊,也比较复杂。这一块的内容深度读者要有心理准备。

    因为zset要支持随机的插入和删除,所以它不好使用数组来表示。我们先看一个普通的链表结构。

    redis10万字超详细讲解-1

    我们需要这个链表按照score值进行排序。这意味着当有新元素需要插入时,需要定位到特定位置的插入点,这样才可以继续保证链表是有序的。通常我们会通过二分查找来找到插入点,但是二分查找的对象必须是数组,只有数组才可以支持快速位置定位,链表做不到,那该怎么办?

    想想一个创业公司,刚开始只有几个人,团队成员之间人人平等,都是联合创始人。随着公司的成长,人数渐渐变多,团队沟通成本随之增加。这时候就会引入组长制,对团队进行划分。每个团队会有一个组长。开会的时候分团队进行,多个组长之间还会有自己的会议安排。公司规模进一步扩展,需要再增加一个层级——部门,每个部门会从组长列表中推选出一个代表来作为部长。部长们之间还会有自己的高层会议安排。

    跳跃列表就是类似于这种层级制,最下面一层所有的元素都会串起来。然后每隔几个元素挑选出一个代表来,再将这几个代表使用另外一级指针串起来。然后在这些代表里再挑出二级代表,再串起来。最终就形成了金字塔结构。

    想想你老家在世界地图中的位置:亚洲-->中国->安徽省->安庆市->枞阳县->汤沟镇->田间村->xxxx号,也是这样一个类似的结构。

    「跳跃列表」之所以「跳跃」,是因为内部的元素可能「身兼数职」,比如上图中间的这个元素,同时处于L0、L1和L2层,可以快速在不同层次之间进行「跳跃」。

    定位插入点时,先在顶层进行定位,然后下潜到下一级定位,一直下潜到最底层找到合适的位置,将新元素插进去。你也许会问那新插入的元素如何才有机会「身兼数职」呢?

    跳跃列表采取一个随机策略来决定新元素可以兼职到第几层,首先L0层肯定是100%了,L1层只有50%的概率,L2层只有25%的概率,L3层只有12.5%的概率,一直随机到最顶层L31层。绝大多数元素都过不了几层,只有极少数元素可以深入到顶层。列表中的元素越多,能够深入的层次就越深,能进入到顶层的概率就会越大。

    这还挺公平的,能不能进入中央不是靠拼爹,而是看运气

    redis缓存一致性

    1.Cache-Aside Pattern

    Cache-Aside Pattern,即旁路缓存模式,它的提出是为了尽可能地解决缓存与数据库的数据不一致问题。

  • 读的时候,先读缓存,缓存命中的话,直接返回数据
  • 缓存没有命中的话,就去读数据库,从数据库取出数据,放入缓存后,同时返回响应。
  • Cache-Aside 写流程

    Cache-Aside Pattern的写请求流程如下:

    2.操作缓存的时候,删除缓存呢,还是更新缓存?

    一般业务场景,我们使用的就是Cache-Aside模式。 有些小伙伴可能会问, Cache-Aside在写入请求的时候,为什么是删除缓存而不是更新缓存呢?

  • 线程A先发起一个写操作,第一步先更新数据库
  • 线程B再发起一个写操作,第二步更新了数据库
  • 由于网络等原因,线程B先更新了缓存
  • 线程A更新缓存。
  • 这时候,缓存保存的是A的数据(老数据),数据库保存的是B的数据(新数据),数据不一致了,脏数据出现啦。如果是删除缓存取代更新缓存则不会出现这个脏数据问题。

    不管是延时双删还是Cache-Aside的先操作数据库再删除缓存,如果第二步的删除缓存失败呢,删除失败会导致脏数据哦~

    删除失败就多删除几次呀,保证删除缓存成功呀~ 所以可以引入删除缓存重试机制

  • 写请求更新数据库
  • 缓存因为某些原因,删除失败
  • 把删除失败的key放到消息队列
  • 消费消息队列的消息,获取要删除的key
  • 重试删除缓存操作
  • 3.读取biglog异步删除缓存

    重试删除缓存机制还可以,就是会造成好多业务代码入侵。其实,还可以通过数据库的binlog来异步淘汰key

    以mysql为例 可以使用阿里的canal将binlog日志采集发送到MQ队列里面,然后通过ACK机制确认处理这条更新消息,删除缓存,保证数据缓存一致性

    redis分布式锁

    日常开发中,秒杀下单、抢红包等等业务场景,都需要用到分布式锁。而Redis非常适合作为分布式锁使用。本文将分七个方案展开

    • 什么是分布式锁
    • 方案一:SETNX + EXPIRE
    • 方案二:SETNX + value值是(系统时间+过期时间)
    • 方案三:使用Lua脚本(包含SETNX + EXPIRE两条指令)
    • 方案四:SET的扩展命令(SET EX PX NX)
    • 方案五:SET EX PX NX + 校验唯一随机值,再释放锁
    • 方案六: 开源框架:Redisson
    • 方案七:多机实现的分布式锁Redlock

    什么是分布式锁

    分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。

    我们先来看下,一把靠谱的分布式锁应该有哪些特征:

    • 互斥性: 任意时刻,只有一个客户端能持有锁。
    • 锁超时释放:持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁。
    • 可重入性:一个线程如果获取了锁之后,可以再次对其请求加锁。
    • 高性能和高可用:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效。
    • 安全性:锁只能被持有的客户端删除,不能被其他客户端删除

    Redis分布式锁方案一:SETNX + EXPIRE

    提到Redis的分布式锁,很多小伙伴马上就会想到setnx+ expire命令。即先用setnx来抢锁,如果抢到之后,再用expire给锁设置一个过期时间,防止锁忘记了释放。

    SETNX 是SET IF NOT EXISTS的简写.日常命令格式是SETNX key value,如果 key不存在,则SETNX成功返回1,如果这个key已经存在了,则返回0。

    假设某电商网站的某商品做秒杀活动,key可以设置为key_resource_id,value设置任意值,伪代码如下:

    if(jedis.setnx(key_resource_id,lock_value) == 1){ //加锁
        expire(key_resource_id,100); //设置过期时间
        try {
            do something  //业务请求
        }catch(){
      }
      finally {
           jedis.del(key_resource_id); //释放锁
        }
    }
    复制代码
    

    但是这个方案中,setnxexpire两个命令分开了,不是原子操作。如果执行完setnx加锁,正要执行expire设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,别的线程永远获取不到锁啦。

    Redis分布式锁方案二:SETNX + value值是(系统时间+过期时间)

    为了解决方案一,发生异常锁得不到释放的场景,有小伙伴认为,可以把过期时间放到setnx的value值里面。如果加锁失败,再拿出value值校验一下即可。加锁代码如下:

    long expires = System.currentTimeMillis() + expireTime; //系统时间+设置的过期时间
    String expiresStr = String.valueOf(expires);
    
    
    // 如果当前锁不存在,返回加锁成功
    if (jedis.setnx(key_resource_id, expiresStr) == 1) {
            return true;
    } 
    // 如果锁已经存在,获取锁的过期时间
    String currentValueStr = jedis.get(key_resource_id);
    
    
    // 如果获取到的过期时间,小于系统当前时间,表示已经过期
    if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
    
    
         // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间(不了解redis的getSet命令的小伙伴,可以去官网看下哈)
        String oldValueStr = jedis.getSet(key_resource_id, expiresStr);
        
        if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
             // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁
             return true;
        }
    }
            
    //其他情况,均返回加锁失败
    return false;
    }
    复制代码
    

    这个方案的优点是,巧妙移除expire单独设置过期时间的操作,把过期时间放到setnx的value值里面来。解决了方案一发生异常,锁得不到释放的问题。但是这个方案还有别的缺点:

    Redis分布式锁方案三:使用Lua脚本(包含SETNX + EXPIRE两条指令)

    实际上,我们还可以使用Lua脚本来保证原子性(包含setnx和expire两条指令),lua脚本如下:

    if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then
       redis.call('expire',KEYS[1],ARGV[2])
    else
       return 0
    end;
    复制代码
    

    加锁代码如下:

     String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
                " redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";   
    Object result = jedis.eval(lua_scripts, Collections.singletonList(key_resource_id), Collections.singletonList(values));
    //判断是否成功
    return result.equals(1L);
    复制代码
    

    这个方案还是有缺点的哦,至于哪些缺点,你先思考一下。也可以想下。跟方案二对比,哪个更好?

    Redis分布式锁方案方案四:SET的扩展命令(SET EX PX NX)

    除了使用,使用Lua脚本,保证SETNX + EXPIRE两条指令的原子性,我们还可以巧用Redis的SET指令扩展参数!(SET key value[EX seconds][PX milliseconds][NX|XX]),它也是原子性的!

    SET key value[EX seconds][PX milliseconds][NX|XX]

    伪代码demo如下:

    if(jedis.set(key_resource_id, lock_value, "NX", "EX", 100s) == 1){ //加锁
        try {
            do something  //业务处理
        }catch(){
      }
      finally {
           jedis.del(key_resource_id); //释放锁
        }
    }
    复制代码
    

    但是呢,这个方案还是可能存在问题:

    • 问题一:锁过期释放了,业务还没执行完。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行的啦。
    • 问题二:锁被别的线程误删。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完呢。

    方案五:SET EX PX NX + 校验唯一随机值,再删除

    既然锁可能被别的线程误删,那我们给value值设置一个标记当前线程唯一的随机数,在删除的时候,校验一下,不就OK了嘛。伪代码如下:

    if(jedis.set(key_resource_id, uni_request_id, "NX", "EX", 100s) == 1){ //加锁
        try {
            do something  //业务处理
        }catch(){
      }
      finally {
           //判断是不是当前线程加的锁,是才释放
           if (uni_request_id.equals(jedis.get(key_resource_id))) {
            jedis.del(lockKey); //释放锁
            }
        }
    }
    复制代码
    

    在这里,判断是不是当前线程加的锁和释放锁不是一个原子操作。如果调用jedis.del()释放锁的时候,可能这把锁已经不属于当前客户端,会解除他人加的锁。

    为了更严谨,一般也是用lua脚本代替。lua脚本如下:

    if redis.call('get',KEYS[1]) == ARGV[1] then 
       return redis.call('del',KEYS[1]) 
    else
       return 0
    end;
    复制代码
    

    Redis分布式锁方案六:Redisson框架

    方案五还是可能存在锁过期释放,业务没执行完的问题。有些小伙伴认为,稍微把锁过期时间设置长一些就可以啦。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。

    当前开源框架Redisson解决了这个问题。我们一起来看下Redisson底层原理图吧:

    只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key的生存时间。因此,Redisson就是使用Redisson解决了锁过期释放,业务没执行完问题。

    Redis分布式锁方案七:多机实现的分布式锁Redlock+Redisson

    前面六种方案都只是基于单机版的讨论,还不是很完美。其实Redis一般都是集群部署的:

    如果线程一在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。线程二就可以获取同个key的锁啦,但线程一也已经拿到锁了,锁的安全性就没了。

    为了解决这个问题,Redis作者 antirez提出一种高级的分布式锁算法:Redlock。Redlock核心思想是这样的:

    搞多个Redis master部署,以保证它们不会同时宕掉。并且这些master节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master实例上,是与在Redis单实例,使用相同方法来获取和释放锁。

    我们假设当前有5个Redis master节点,在5台服务器上面运行这些Redis实例。

    RedLock的实现步骤:如下

    简化下步骤就是:

    • 按顺序向5个master节点请求加锁
    • 根据设置的超时时间来判断,是不是要跳过该master节点。
    • 如果大于等于三个节点加锁成功,并且使用的时间小于锁的有效期,即可认定加锁成功啦。
    • 如果获取锁失败,解锁!

    Redisson实现了redLock版本的锁,有兴趣的小伙伴,可以去了解一下哈~

    redis主从复制

    1.概念

    Redis的主从复制概念和MySQL的主从复制大概类似。一台主机master,一台从机slaver。master主机数据更新后根据配置和策略,自动同步到slaver从机,Master以写为主,Slave以读为主

    2.主要用途

    • 读写分离:适用于读多写少的应用,增加多个从机,提高读的速度,提高程序并发
    • 数据容灾恢复:从机复制主机的数据,相当于数据备份,如果主机数据丢失,那么可以通过从机存储的数据进行恢复。
    • 高并发、高可用集群实现的基础:在高并发的场景下,就算主机挂了,从机可以进行主从切换,从机自动成为主机对外提供服务。

    3.环境准备

    老哥太穷了,就用一台机器模拟三个机器。

    • 第一步:将redis.conf复制3份,分别是redis6379.conf、redis6380.conf、redis6381.conf
    • 第二步: 修改三个redis.conf文件里的port端口、pid文件名、日志文件名、rdb文件名
    • 第三步: 分别打开三个窗口模拟三台服务器,并开启redis服务。

    查看当前3台机器主从角色

    先用命令info replication看看3台机器目前的角色是什么。

    # 三台机器都是这个状态
    
    127.0.0.1:6379> info replication
    
    # 角色是master主机
    
    role:master
    
    # 从机个数为0
    
    connected_slaves:0
    

    设置主从关系

    这里注意,我们只设置从机就可以了,不用设置主机。我们选择63806381作为从机6379作为主机

    # 6380 端口
    
    127.0.0.1:6380> SLAVEOF 127.0.0.1 6379
    
    
    
    # 6381 端口
    
    127.0.0.1:6381> SLAVEOF 127.0.0.1 6379
    

    再次查看3台机器目前角色

    再次执行命令:info replication

    # 主机127.0.0.1:6379> 
    info replicationrole:master 
    # 角色:
    主机connected_slaves:2 
    #连接的从机个数,以及从机IP和端口
    slave0:ip=127.0.0.1,port=6380,state=online,offset=98,lag=1
    slave1:ip=127.0.0.1,port=6381,state=online,offset=98,lag=1
    
    
    # 从机1
    
    127.0.0.1:6380> info replication
    
    role:slave # 角色:从机
    
    master_host:127.0.0.1 # 主机的IP和端口
    
    master_port:6379
    
    
    # 从机2
    
    127.0.0.1:6381> info replication
    
    role:slave # 角色:从机
    
    master_host:127.0.0.1 # 主机的IP和端口
    
    master_port:6379
    

    4.搭建成功,试验一把

    • 全量复制: 从机会把主机之前的数据全部都同步过来,大家可以在从机上get 某key试试。
    • 增量复制: 当主机新增数据时,从机会将该新增数据同步过来,大家可以在主机上执行命令set key value,然后在从机上get 该key,看是否能获取到。

    5.读写分离

    Redis的从机默认不允许进行写操作,大家可以在从机上执行命令set key value,会报错。

    # 6380从机
    
    127.0.0.1:6380> set k3 v3
    
    (error) READONLY You can't write against a read only slave.
    

    6.主从复制原理

    7.全量复制

    【**①」**slave发送psync,由于是第一次复制,不知道master的runid,自然也不知道offset,所以发送psync ? -1

    **「②」**master收到请求,发送master的runid和offset给从节点。

    **「③」**从节点slave保存master的信息

    **「④」**主节点bgsave保存rdb文件

    **「⑤」**主机点发送rdb文件

    并且在**「④」和「⑤」**的这个过程中产生的数据,会写到复制缓冲区repl_back_buffer之中去。

    **「⑥」**主节点发送上面两个步骤产生的buffer到从节点slave

    **「⑦」**从节点清空原来的数据,如果它之前有数据,那么久会清空数据

    **「⑧」**从节点slave把rdb文件的数据装载进自身。

    8.全量复制的开销

    **「①」**bgsave时间

    **「②」**rdb文件网络传输时间

    **「③」**从节点清空数据的

    **「④」**从节点加载rdb的时间

    **「⑤」**可能的aof重写时间,这是针对从节点,例如开启了aof之后,从节点添加buffer数据时候,可能需要aof重写

    基于上面的原因,有的情况下不适合使用全量复制,例如网络抖动之后,从节点只需要传送一部分数据,不需要传送全部数据,redis2.8之后实现了部分复制功能

    9.部分复制

    【**①」**假设发送网络抖动或者别的情况,暂时失去了连接

    **「②」**这个时候,master还在继续往buffer里面写数据

    **「③」**slave重新连接上了master

    **「④」**slave向master发送自己的offset和runid

    **「⑤」**master判断slave的offset是否在buffer的队列里面,如果是,那就返回continue给slave,否则需要进行全量复制(因为这说明已经错过了很多数据了)

    **「⑥」**master发送从slave的offset开始到缓冲区队列结尾的数据给slave

    redis哨兵模式

    1.概述

    主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑 哨兵模式 。

    redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。

    谋朝篡位 的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

    哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的 进程 ,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

    2.配置哨兵

    • 添加哨兵配置文件 sentinel.conf

    内容如下:

    # sentinel monitor 被监控的名称 host port 1 (代表自动投票选举大哥!)
    
    sentinel monitor myredis 127.0.0.1 6379 1
    

    3.启动哨兵

    redis-sentinel dyjConfig/sentinel.conf   #和启动Redis一致
    

    启动成功后如下图!

    3、前提准备条件:

    开启一台主机,两台从机,一主二从时最基本的!

    4、测试主机宕机后自动选取大哥,如果主机此时回来了,只能归并到新的主机下,当做从机,这就是哨兵模式的规则!

    等待哨兵的默认配置时间时 30 秒!

    再次查看redis信息:

    可以发现8381变成主机,8380依旧是从机!

    我们将老大哥主机连接试试!可以发现6379变成从机了,由大哥变为小弟!

    而6381成功成为主机大哥大!

    4.总结

    • 优点

    ①哨兵集群,基于主从复制模式 ,所有的主从配置优点,它全有

    ②主从可以切换,故障可以转移 ,系统的 可用性 就会更好

    ③哨兵模式就是主从模式的升级,手动到自动,更加健壮!

    • 缺点

    ①Redis 不好在线扩容 的,集群容量一旦到达上限,在线扩容就十分麻烦!

    ②实现哨兵模式的配置其实是很 麻烦 的,里面有很多选择!

    • 注意点:以上所有的配置因为条件所限都是基于单机集群的前提下!有兴趣的可以自己搭建下正式集群下的多哨兵模式来监控!如下图:

    • 哨兵的配置文件解析
    # Example sentinel.conf 
    
    
    
    # 哨兵sentinel实例运行的端口 默认26379 
    
    port 26379 
    
    
    
    # 哨兵sentinel的工作目录 
    
    dir /tmp 
    
    
    
    # 哨兵sentinel监控的redis主节点的 ip port 
    
    # master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。 
    
    # quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了 
    
    # sentinel monitor     sentinel monitor mymaster 127.0.0.1 6379 2 
    
    
    
    # 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供 密码
    
    # 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码 
    
    # sentinel auth-pass   
    
    sentinel auth-pass mymaster MySUPER--secret-0123passw0rd 
    
    
    
    # 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒 
    
    # sentinel down-after-milliseconds   
    
    sentinel down-after-milliseconds mymaster 30000 
    
    
    
    # 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步
    
    #这个数字越小,完成failover所需的时间就越长,
    
    # 但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。 
    
    #可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。 
    
    # sentinel parallel-syncs   
    
    sentinel parallel-syncs mymaster 1 
    
    
    
    
    
    # 故障转移的超时时间 failover-timeout 可以用在以下这些方面: 
    
    #1. 同一个sentinel对同一个master两次failover之间的间隔时间。 
    
    #2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那 里同步数据时。 
    
    #3.当想要取消一个正在进行的failover所需要的时间。 
    
    #4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时, slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了 
    
    # 默认三分钟 # sentinel failover-timeout  
    
    sentinel failover-timeout mymaster 180000 
    
    
    
    
    
    # SCRIPTS EXECUTION #配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知 相关人员。 
    
    #对于脚本的运行结果有以下规则: 
    
    #若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10 #若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。 
    
    #如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。 
    
    #一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。 
    
    #通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等), 将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信 息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配 置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无 法正常启动成功。 
    
    #通知脚本 
    
    # shell编程 
    
    # sentinel notification-script   
    
    sentinel notification-script mymaster /var/redis/notify.sh 
    
    
    
    
    
    # 客户端重新配置主节点参数脚本 
    
    # 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已 经发生改变的信息。 
    
    # 以下参数将会在调用脚本时传给脚本: 
    
    #        
    
    # 目前总是“failover”, 
    
    # 是“leader”或者“observer”中的一个。 
    
    # 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通 信的
    
    # 这个脚本应该是通用的,能被多次调用,不是针对性的。 
    
    # sentinel client-reconfig-script   
    
    sentinel client-reconfig-script mymaster /var/redis/reconfig.sh 
    
    # 一般都是由运维来配置!
    

    redis持久化

    1.RDB

    RDB是一种快照存储持久化方式,具体就是将Redis某一时刻的内存数据保存到硬盘的文件当中,默认保存的文件名为dump.rdb,而在Redis服务器启动时,会重新加载dump.rdb文件的数据到内存当中恢复数据。

    1.开启RDB持久化方式

    开启RDB持久化方式很简单,客户端可以通过向Redis服务器发送savebgsave命令让服务器生成rdb文件,或者通过服务器配置文件指定触发RDB条件。

    • save命令

    save命令是一个同步操作。

    同步数据到磁盘上

    save

    当客户端向服务器发送save命令请求进行持久化时,服务器会阻塞save命令之后的其他客户端的请求,直到数据同步完成。

    如果数据量太大,同步数据会执行很久,而这期间Redis服务器也无法接收其他请求,所以,最好不要在生产环境使用save命令。

    • bgsave

    save命令不同,bgsave命令是一个异步操作。

    # 异步保存数据集到磁盘上
    
    > bgsave
    
    • 服务器配置自动触发

    除了通过客户端发送命令外,还有一种方式,就是在Redis配置文件中的save指定到达触发RDB持久化的条件,比如【多少秒内至少达到多少写操作】就开启RDB数据同步。

    例如我们可以在配置文件redis.conf指定如下的选项:

    # 900s内至少达到一条写命令
    
    save 900 1
    
    # 300s内至少达至10条写命令
    
    save 300 10
    
    # 60s内至少达到10000条写命令
    
    save 60 10000
    

    这种通过服务器配置文件触发RDB的方式,与bgsave命令类似,达到触发条件时,会forks一个子进程进行数据同步,不过最好不要通过这方式来触发RDB持久化,因为设置触发的时间太短,则容易频繁写入rdb文件,影响服务器性能,时间设置太长则会造成数据丢失。

    2.rdb文件

    前面介绍了三种让服务器生成rdb文件的方式,无论是由主进程生成还是子进程来生成,其过程如下:

  • 生成临时rdb文件,并写入数据。
  • 完成数据写入,用临时文代替代正式rdb文件。
  • 删除原来的db文件。
  • RDB默认生成的文件名为dump.rdb,当然,我可以通过配置文件进行更加详细配置,比如在单机下启动多个redis服务器进程时,可以通过端口号配置不同的rdb名称,如下所示:

    # 是否压缩rdb文件
    
    rdbcompression yes
    
    
    
    # rdb文件的名称
    
    dbfilename redis-6379.rdb
    
    
    
    # rdb文件保存目录
    
    dir ~/redis/
    

    3.RDB的几个优点

  • 与AOF方式相比,通过rdb文件恢复数据比较快。
  • rdb文件非常紧凑,适合于数据备份。
  • 通过RDB进行数据备,由于使用子进程生成,所以对Redis服务器性能影响较小。
  • 4.RDB的几个缺点

  • 如果服务器宕机的话,采用RDB的方式会造成某个时段内数据的丢失,比如我们设置10分钟同步一次或5分钟达到1000次写入就同步一次,那么如果还没达到触发条件服务器就死机了,那么这个时间段的数据会丢失。
  • 使用save命令会造成服务器阻塞,直接数据同步完成才能接收后续请求。
  • 使用bgsave命令在forks子进程时,如果数据量太大,forks的过程也会发生阻塞,另外,forks子进程会耗费内存
  • 5.RDB内部原理

    • 基于liunx fork子进程,创建一个子进程时,只会修改指针
    • 基于写时复制

    2.AOF

    AOF(Append-only file)。与RDB存储某个时刻的快照不同,AOF持久化方式会记录客户端对服务器的每一次写操作命令,并将这些写操作以Redis协议追加保存到以后缀为aof文件末尾,在Redis服务器重启时,会加载并运行aof文件的命令,以达到恢复数据的目的。

    1.开启AOF持久化方式

    Redis默认不开启AOF持久化方式,我们可以在配置文件中开启并进行更加详细的配置,如下面的redis.conf文件:

    # 开启aof机制
    
    appendonly yes
    
    
    
    # aof文件名
    
    appendfilename "appendonly.aof"
    
    
    
    # 写入策略,always表示每个写操作都保存到aof文件中,也可以是everysec或no
    
    appendfsync always
    
    
    
    # 默认不重写aof文件
    
    no-appendfsync-on-rewrite no
    
    
    
    # 保存目录
    
    dir ~/redis/
    

    2.三种写入策略

    在上面的配置文件中,我们可以通过appendfsync选项指定写入策略,有三个选项

    appendfsync always
    
    # appendfsync everysec
    
    # appendfsync no
    
    • always

    客户端的每一个写操作都保存到aof文件当,这种策略很安全,但是每个写请注都有IO操作,所以也很慢。

    • everysec

    appendfsync的默认写入策略,每秒写入一次aof文件,因此,最多可能会丢失1s的数据。

    • no

    Redis服务器不负责写入aof,而是交由操作系统来处理什么时候写入aof文件。更快,但也是最不安全的选择,不推荐使用。

    • AOF文件重写

    AOF将客户端的每一个写操作都追加到aof文件末尾,比如对一个key多次执行incr命令,这时候,aof保存每一次命令到aof文件中,aof文件会变得非常大。

    incr num 1
    
    incr num 2
    
    incr num 3
    
    incr num 4
    
    incr num 5
    
    incr num 6
    
    ...
    
    incr num 100000
    

    aof文件太大,加载aof文件恢复数据时,就会非常慢,为了解决这个问题,Redis支持aof文件重写,通过重写aof,可以生成一个恢复当前数据的最少命令集,比如上面的例子中那么多条命令,可以重写为:

    set num 100000
    

    aof文件是一个二进制文件,并不是像上面的例子一样,直接保存每个命令,而使用Redis自己的格式,上面只是方便演示。

    两种重写方式

    通过在redis.conf配置文件中的选项no-appendfsync-on-rewrite可以设置是否开启重写,这种方式会在每次fsync时都重写,影响服务器性以,因此默认值为no,不推荐使用。

    默认不重写aof文件

    no-appendfsync-on-rewrite no

    AOF重写方式也是异步操作,即如果要写入aof文件,则Redis主进程会forks一个子进程来处理,如下所示:

    3.重写aof文件的好处

  • 压缩aof文件,减少磁盘占用量。
  • 将aof的命令压缩为最小命令集,加快了数据恢复的速度。
  • 4.AOF文件损坏

    在写入aof日志文件时,如果Redis服务器宕机,则aof日志文件文件会出格式错误,在重启Redis服务器时,Redis服务器会拒绝载入这个aof文件,可以通过以下步骤修复aof并恢复数据。

  • 备份现在aof文件,以防万一。
  • 使用redis-check-aof命令修复aof文件,该命令格式如下:
  • # 修复aof日志文件
    
    $ redis-check-aof -fix file.aof
    

    5.AOF的优点

    AOF只是追加日志文件,因此对服务器性能影响较小,速度比RDB要快,消耗的内存较少。

    6.AOF的缺点

    • AOF方式生成的日志文件太大,即使通过AFO重写,文件体积仍然很大。
    • 恢复数据的速度比RDB慢。

    7.如何选择持久化方案

    通过上面的介绍,我们了解了RDB与AOF各自的优点与缺点,到底要如何选择呢?

    通过下面的表示,我们可以从几个方面对比一下RDB与AOF,在应用时,要根本自己的实际需求,选择RDB或者AOF,其实,如果想要数据足够安全,可以两种方式都开启,但两种持久化方式同时进行IO操作,会严重影响服务器性能,因此有时候不得不做出选择。

    8.RDB和AOF到底该如何选择

    1)不要仅仅使用RDB,因为那样会导致你丢失很多数据

    (2)也不要仅仅使用AOF,因为那样有两个问题,第一,你通过AOF做冷备,没有RDB做冷备,来的恢复速度更快; 第二,RDB每次简单粗暴生成数据快照,更加健壮,可以避免AOF这种复杂的备份和恢复机制的bug

    (3)综合使用AOF和RDB两种持久化机制,用AOF来保证数据不丢失,作为数据恢复的第一选择; 用RDB来做不同程度的冷备,在AOF文件都丢失或损坏不可用的时候,还可以使用RDB来进行快速的数据恢复

    redis限流

    限流算法 常见的限流算法有:计数器,时间窗口,漏桶、令牌桶。

    1.计数器

    顾名思义就是来一个记一个,然后判断在有限时间窗口内的数量是否超过限制即可

    function isActionAllowed($userId, $action, $period, $maxCount) 
    
    {
    
        $redis = new Redis();
    
        $redis->connect('127.0.0.1', 6379);
    
        $key = sprintf('hist:%s:%s', $userId, $action);
    
        $now = msectime();   # 毫秒时间戳
    
    
    
        $pipe=$redis->multi(Redis::PIPELINE); //使用管道提升性能
    
        $pipe->zadd($key, $now, $now); //value 和 score 都使用毫秒时间戳
    
        $pipe->zremrangebyscore($key, 0, $now - $period); //移除时间窗口之前的行为记录,剩下的都是时间窗口内的
    
        $pipe->zcard($key);  //获取窗口内的行为数量
    
        $pipe->expire($key, $period + 1);  //多加一秒过期时间
    
        $replies = $pipe->exec();
    
        return $replies[2]  cl.throttle mylimit 15 30 60
    
    1)(integer)0 # 0 表示获取成功,1 表示拒绝
    
    2)(integer)15 # 漏斗容量
    
    3)(integer)14 # 漏斗剩余容量
    
    4)(integer)-1 # 被拒绝之后,多长时间之后再试(单位:秒)-1 表示无需重试
    
    5)(integer)2 # 多久之后漏斗完全空出来
    

    4.令牌算法

    在令牌桶算法中有一个程序以某种恒定的速度生成令牌,并存入令牌桶中,而每个请求需要先获取令牌才能执行,如果没有获取到令牌的请求可以选择等待或者放弃执行,如下图所示:

    我们可以使用 Google 开源的 guava 包,很方便的实现令牌桶算法,首先在 pom.xml 添加 guava 引用,配置如下:

    
    
    
    
        com.google.guava
    
        guava
    
        28.2-jre
    
    
    

    具体实现代码如下:

    import com.google.common.util.concurrent.RateLimiter;
    
    
    
    import java.time.Instant;
    
    
    
    /**
    
     * Guava 实现限流
    
     */
    
    public class RateLimiterExample {
    
        public static void main(String[] args) {
    
            // 每秒产生 10 个令牌(每 100 ms 产生一个)
    
            RateLimiter rt = RateLimiter.create(10);
    
            for (int i = 0; i  {
    
                    // 获取 1 个令牌
    
                    rt.acquire();
    
                    System.out.println("正常执行方法,ts:" + Instant.now());
    
                }).start();
    
            }
    
        }
    
    }
    

    从以上结果可以看出令牌确实是每 100ms 产生一个,而 acquire() 方法为阻塞等待获取令牌,它可以传递一个 int 类型的参数,用于指定获取令牌的个数。它的替代方法还有 tryAcquire(),此方法在没有可用令牌时就会返回 false 这样就不会阻塞等待了。当然 tryAcquire() 方法也可以设置超时时间,未超过最大等待时间会阻塞等待获取令牌,如果超过了最大等待时间,还没有可用的令牌就会返回 false。

    注意:使用 guava 实现的令牌算法属于程序级别的单机限流方案,而上面使用 Redis-Cell 的是分布式的限流方案。

    5.tomcat限流

    Tomcat 8.5 版本的最大线程数在 conf/server.xml 配置中,如下所示:

    
    

    其中 maxThreads 就是 Tomcat 的最大线程数,当请求的并发大于此值(maxThreads)时,请求就会排队执行,这样就完成了限流的目的。

    小贴士:maxThreads 的值可以适当的调大一些,此值默认为 150(Tomcat 版本 8.5.42),但这个值也不是越大越好,要看具体的硬件配置,需要注意的是每开启一个线程需要耗用 1MB 的 JVM 内存空间用于作为线程栈之用,并且线程越多 GC 的负担也越重。最后需要注意一下,操作系统对于进程中的线程数有一定的限制,Windows 每个进程中的线程数不允许超过 2000,Linux 每个进程中的线程数不允许超过 1000

    6.nginx限流

    Nginx 提供了两种限流手段:一是控制速率,二是控制并发连接数。

    控制速率

    我们需要使用 limit_req_zone 用来限制单位时间内的请求数,即速率限制,示例配置如下:

    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
    
    server { 
    
        location / { 
    
            limit_req zone=mylimit;
    
        }
    
    }
    

    以上配置表示,限制每个 IP 访问的速度为 2r/s,因为 Nginx 的限流统计是基于毫秒的,我们设置的速度是 2r/s,转换一下就是 500ms 内单个 IP 只允许通过 1 个请求,从 501ms 开始才允许通过第 2 个请求。

    我们使用单 IP 在 10ms 内发并发送了 6 个请求的执行结果如下:

    从以上结果可以看出他的执行符合我们的预期,只有 1 个执行成功了,其他的 5 个被拒绝了(第 2 个在 501ms 才会被正常执行)。

    速率限制升级版

    上面的速率控制虽然很精准但是应用于真实环境未免太苛刻了,真实情况下我们应该控制一个 IP 单位总时间内的总访问次数,而不是像上面那么精确但毫秒,我们可以使用 burst 关键字开启此设置,示例配置如下:

    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
    
    server { 
    
        location / { 
    
            limit_req zone=mylimit burst=4;
    
        }
    
    }
    

    burst=4 表示每个 IP 最多允许4个突发请求,如果单个 IP 在 10ms 内发送 6 次请求的结果如下

    从以上结果可以看出,有 1 个请求被立即处理了,4 个请求被放到 burst 队列里排队执行了,另外 1 个请求被拒绝了。

    控制并发数

    利用 limit_conn_zonelimit_conn 两个指令即可控制并发数,示例配置如下:

    limit_conn_zone $binary_remote_addr zone=perip:10m;
    
    limit_conn_zone $server_name zone=perserver:10m;
    
    server {
    
        ...
    
        limit_conn perip 10;
    
        limit_conn perserver 100;
    
    }
    

    其中 limit_conn perip 10 表示限制单个 IP 同时最多能持有 10 个连接;limit_conn perserver 100 表示 server 同时能处理并发连接的总数为 100 个。

    7.进阶 lua+redis令牌桶

    限流是一个很大的话题,准备把其中的所有限流器都实现一遍,以此也算全都写过了,到时候再用也不至于会心虚,毕竟真实写完成过。本文主要讲述了如何基于 Redis 与 Lua实现分布式令牌桶的限流方案。

    读前提问

    我觉得学习任何东西前都应该有自己的反问,这种反问基于标题给你的大概印象。带着问题来看文章,最后应该比盲目的看有收获,先提出几个基础的问题。

    限流是什么

    通过某种手段对某个时间段的并发访问请求进行流量限制,一旦流量达到限制阈值则可以拒绝服务,排队或等待,目的是防止系统因大流量或突发流量导致服务不可用或崩溃,是一种确保系统高可用的手段。

    限流的简单了解

    限流常见场景

    • 对外限流:
    • 电商秒杀(因秒杀业务特性,需要限流):到达开卖时间瞬间大流量,此时下单人数>商品库存,服务器不可能同时全部消费,需要进行限流,卖完了之后就拒绝后续下单请求。
    • 微博热搜(因产品特性,需要限流):突然出现了几个大瓜,那微博是不是突然流量激增,重灾区就是微博热搜,此时所有服务满载运行,必须有个限流策略保证服务的高可用。
    • 防止恶意攻击(突发恶意攻击,需要限流):比如某一个 API 被疯狂请求,或者某一个 IP 疯狂请求公司的 API,此时就需要进行限流,常见措施是先告警,再限流。为了不影响其他服务的正常使用,需要设计限流方案。
    • API有偿调用:用户认证+限流策略,顾名思义没啥好说的,一般是 SAAS 公司最常见的业务,常见于 OPEN-API 相关的小组负责的。
    • 对内限流
    • BUG预防:核心服务的高可用是十分重要的,千万不能挂。如果内部应用出现 bug,一直调用核心服务,核心服务就有被击垮的风险,限流也十分重要。
    • 缓存雪崩:请求直接打到 DB,那就哦豁完蛋了,所以需要根据业务场景来实现限流后是排队还是丢弃。

    综上所得,需要进行限流的场景可以分为三种:

  • 公共的 API ,限流策略用于open-api 网关与相关服务的可用性,同时可以防止恶意攻击。
  • 内部的核心应用,应对 bug 或其他突发情况,目的就是保证突发情况下核心应用的高可用。
  • 产品具备突发大流量请求的特性,妥妥的都给加上限流策略,保证整个系统的高可用。
  • 限流解决了什么问题

    保证服务高可用,牺牲一部分的流量,换取服务的可用性。对于被限流器直接作用的应用来说,除了保证自身不被流量击垮,还保护了依赖它的下游应用。

    限流带来的问题

    任何技术都是双刃剑,没有绝对的好用,能带来优点必然也会带来问题。

    • 限流组件保证了高可用,牺牲了性能,增加了一层 IO 环节的开销,单机限流在本地,分布式限流还要通过网络协议。
    • 限流组件保证了高可用,牺牲了一致性,在大流量的情况下,请求的处理会出现延迟的情况,这种场景便无法保证强一致性。特殊情况下,还无法保证最终一致性,部分请求直接被抛弃。
    • 限流组件拥有流控权,若限流组件挂了,会引起雪崩效应,导致请求与业务的大批量失败。
    • 引入限流组件,增加系统的复杂程度,开发难度增加,限流中间件的设计本身就是一个复杂的体系,需要综合业务与技术去思考与权衡,同时还要确保限流组件本身的高可用与性能,极大增加工作量,甚至需要一个团队去专门开发。

    设计限流组件本身需要考虑的点

    如果我来设计限流组件,我大致会确认如下几个点:

  • 明确限流器的目的:
  • 用在哪些模块?
  • 应对哪些场景下的什么问题?
  • 是单机限流还是分布式限流?
  • 确定限流模块的使用层面?例如:单应用维度、业务域维度、网关维度
  • 明确限流器的维度,例如 IP 维度,用户授权 token 维度,API 维度等
  • 怎么保证限流组件的高可用?
  • 怎么解决使用限流组件后带来的一致性问题?
  • 怎么缩小限流器的粒度,实现平滑限流?
  • 常见的限流实现

    • 单机
    • 基于Java 并发工具
    • 信号量
    • concurrentHashMap
    • 基于Google Guava RateLimiter
    • 稳定模式(SmoothBursty:令牌生成速度恒定)
    • 渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值)
    • 分布式
    • Redis + Lua
    • Nginx + Lua

    常见限流器种类

    这四种限流器虽然网上介绍的很多,但是我写给自己看的 ^_^,自己要每次遇到都能够脱口而出,而不是“我经常看到过,但是我记不起来了”或者“我知道是什么意思,但是我就是说不出来,也说不清楚”。后续, 等API网关的限流模块代码完成后, 对着代码和实践会仔细展开说说 ~

    • 计数器(固定窗口限流器)
    • 滑动窗口限流器
    • 令牌桶限流器
    • 漏桶限流器

    开始实践

    模拟的场景

    模拟API 网关中的一个 API 接口在某个时刻突然接收到 100 个并发请求,但是该 API 配置的令牌桶限流器每1分钟生成一个,每次限流间隔为 1 小时,限流上限为 60,则通过代码模拟出最终效果,并输出日志。

    实现的效果

    构建请求

    通过参数可知,限流器的类别LimiterType选择的是令牌桶,限流的时间单位timeUnit是每小时,每个限流时间内的令牌桶内令牌的最大数量value是 60.

    {
        "id": 3,
        "apiId": 3,
        "apiName": "测试API",
        "ip": "127.0.0.1",
        "dimensionName": "app_id",
        "dimensionValue": "testid1234",
        "timeUnit": 2,
        "value": 60,
        "LimiterType": 1
    }
    
    
    复制代码
    

    使用 PostMan 中的迭代器功能,进行循环请求:

    计算令牌桶与推测

    • 限流间隔是 1 小时
    • 桶内最大令牌是 60 个
    • 计算得出令牌的生成间隔是 1 个/1 分钟
    • 模拟并发请求 100 个,每个请求的间隔时间是 0ms
    • 此时令牌并未来得及生成令牌,所以在第 61 个并发的时候请求,令牌用光被限流

    请求的结果

    通过下图可知与上面推测相符合,第 61 个请求被限流。

    关键代码

    总的来说,这个模块的流程比较简单,所以直接理解关键代码就 ok 了,实现起来也很容易。

    限流器的抽象设计

    预计实现四种限流器,目前本文内实现的是令牌桶限流器。限流器的抽象设计是经典的三层结构,也采用了模板方法的思想,也就是最上层的接口,实现一些公共方法与公共抽象的顶层抽象类,最后是每个限流器的独有逻辑放在各自类中来做。

    限流业务的实现

    这里贴出限流业务的核心方法,通过调用doFilter 方法实现判断是否需要进行限流。具体调用哪一种限流器通过这两个对象实现的:LimiterStrategyLimiterStrategy 分别是具体的限流算法与限流策略。

    @Override
        public boolean doFilter(FlowControlConfig flowControlConfig) {
            if (Objects.isNull(flowControlConfig)) {
                log.error("[{}] 流控参数为空", this.getClass().getSimpleName());
                return true;
            }
            String key;
            boolean filterRes = true;
            try {
                key = generateRedisLimiterKey(flowControlConfig);
                LimiterStrategy limiterStrategy = getLimiterStrategyByCode(flowControlConfig.getLimiterType());
                LimiterPolicy limiterPolicy = getLimiterPolicyByCode(flowControlConfig.getLimiterType(), flowControlConfig);
                filterRes = limiterStrategy.access(key, limiterPolicy);
                if (!filterRes) {
                    log.warn("Limiter Id:[{}],key :[{}]已达流量上限值:{},被限制请求!", flowControlConfig.getId(), key, flowControlConfig.getValue());
                    // todo 接入消息告警
                }
            } catch (Exception e) {
                log.error("[{}] 限流器内部出现异常! 入参:{}", this.getClass().getSimpleName(), JSONObject.toJSON(flowControlConfig));
                e.printStackTrace();
            }
            return !filterRes;
        }
    复制代码
    

    令牌桶限流器算法的对象

    package com.teavamc.rpcgateway.core.flow.limiter.policy;
    import com.google.common.collect.Lists;
    import java.util.List;
    
    
    /**
     * 令牌桶限流器的执行对象
     *
     * @Package com.teavamc.rpcgateway.core.limiter.policy
     * @date 2021/1/28 上午11:09
     */
    public class TokenBucketLimiterPolicy extends AbstractLimiterPolicy {
    
    
        /**
         * 限流时间间隔
         * (重置桶内令牌的时间间隔)
         */
        private final long resetBucketInterval;
        /**
         * 最大令牌数量
         */
        private final long bucketMaxTokens;
    
    
        /**
         * 初始可存储数量
         */
        private final long initTokens;
    
    
        /**
         * 每个令牌产生的时间
         */
        private final long intervalPerPermit;
    
    
        /**
         * 令牌桶对象的构造器
         * @param bucketMaxTokens 桶的令牌上限
         * @param resetBucketInterval 限流时间间隔
         * @param maxBurstTime 最大的突发流量的持续时间(通过计算)
         */
        public TokenBucketLimiterPolicy(long bucketMaxTokens, long resetBucketInterval, long maxBurstTime) {
            // 最大令牌数
            this.bucketMaxTokens = bucketMaxTokens;
            // 限流时间间隔
            this.resetBucketInterval = resetBucketInterval;
            // 令牌的产生间隔 = 限流时间 / 最大令牌数
            intervalPerPermit = resetBucketInterval / bucketMaxTokens;
            // 初始令牌数 = 最大的突发流量的持续时间 / 令牌产生间隔
            // 用 最大的突发流量的持续时间 计算的结果更加合理,并不是每次初始化都要将桶装满
            initTokens = Math.min(maxBurstTime / intervalPerPermit, bucketMaxTokens);
        }
    
    
        public long getResetBucketInterval() {
            return resetBucketInterval;
        }
    
    
        public long getBucketMaxTokens() {
            return bucketMaxTokens;
        }
    
    
        public long getInitTokens() {
            return initTokens;
        }
    
    
        public long getIntervalPerPermit() {
            return intervalPerPermit;
        }
    
    
        @Override
        public String[] toParams() {
            List list = Lists.newArrayList();
            list.add(String.valueOf(getIntervalPerPermit()));
            list.add(String.valueOf(System.currentTimeMillis()));
            list.add(String.valueOf(getInitTokens()));
            list.add(String.valueOf(getBucketMaxTokens()));
            list.add(String.valueOf(getResetBucketInterval()));
            return list.toArray(new String[]{});
        }
    
    
    }
    
    
    复制代码
    

    这个代码已经写得很明白了,东西也不多。但是构造器这里还是要理解一下,特别是maxBurstTime 这个字段,记录这个 api 经历的最大突发流量的时间。

    Lua 脚本的解析

    令牌桶的实现是通过 lua 来完成的,所以 lua 是核心逻辑。这是我这边使用的令牌桶方案,都加了注解,如果看不懂就多看几遍,还是看不明白就看最后我的流程图。

    --[[
      1. key - 令牌桶的 key
      2. intervalPerTokens - 生成令牌的间隔(ms)
      3. curTime - 当前时间
      4. initTokens - 令牌桶初始化的令牌数
      5. bucketMaxTokens - 令牌桶的上限
      6. resetBucketInterval - 重置桶内令牌的时间间隔
      7. currentTokens - 当前桶内令牌数
      8. bucket - 当前 key 的令牌桶对象
    ]] --
    
    
    local key = KEYS[1]
    local intervalPerTokens = tonumber(ARGV[1])
    local curTime = tonumber(ARGV[2])
    local initTokens = tonumber(ARGV[3])
    local bucketMaxTokens = tonumber(ARGV[4])
    local resetBucketInterval = tonumber(ARGV[5])
    
    
    local bucket = redis.call('hgetall', key)
    local currentTokens
    
    
    -- 若当前桶未初始化,先初始化令牌桶
    if table.maxn(bucket) == 0 then
        -- 初始桶内令牌
        currentTokens = initTokens
        -- 设置桶最近的填充时间是当前
        redis.call('hset', key, 'lastRefillTime', curTime)
        -- 初始化令牌桶的过期时间, 设置为间隔的 1.5 倍
        redis.call('pexpire', key, resetBucketInterval * 1.5)
    
    
    -- 若桶已初始化,开始计算桶内令牌
    -- 为什么等于 4 ? 因为有两对 field, 加起来长度是 4
    -- { "lastRefillTime(上一次更新时间)","curTime(更新时间值)","tokensRemaining(当前保留的令牌)","令牌数" }
    elseif table.maxn(bucket) == 4 then
    
    
        -- 上次填充时间
        local lastRefillTime = tonumber(bucket[2])
        -- 剩余的令牌数
        local tokensRemaining = tonumber(bucket[4])
    
    
        -- 当前时间大于上次填充时间
        if curTime > lastRefillTime then
    
    
            -- 拿到当前时间与上次填充时间的时间间隔
            -- 举例理解: curTime = 2620 , lastRefillTime = 2000, intervalSinceLast = 620
            local intervalSinceLast = curTime - lastRefillTime
    
    
            -- 如果当前时间间隔 大于 令牌的生成间隔
            -- 举例理解: intervalSinceLast = 620, resetBucketInterval = 1000
            if intervalSinceLast > resetBucketInterval then
    
    
                -- 将当前令牌填充满
                currentTokens = initTokens
    
    
                -- 更新重新填充时间
                redis.call('hset', key, 'lastRefillTime', curTime)
                
            -- 如果当前时间间隔 小于 令牌的生成间隔
            else
    
    
                -- 可授予的令牌 = 向下取整数( 上次填充时间与当前时间的时间间隔 / 两个令牌许可之间的时间间隔 )
                -- 举例理解 : intervalPerTokens = 200 ms , 令牌间隔时间为 200ms
                --           intervalSinceLast = 620 ms , 当前距离上一个填充时间差为 620ms
                --           grantedTokens = 620/200 = 3.1 = 3
                local grantedTokens = math.floor(intervalSinceLast / intervalPerTokens)
    
    
                -- 可授予的令牌 > 0 时
                -- 举例理解 : grantedTokens = 620/200 = 3.1 = 3
                if grantedTokens > 0 then
    
    
                    -- 生成的令牌 = 上次填充时间与当前时间的时间间隔 % 两个令牌许可之间的时间间隔
                    -- 举例理解 : padMillis = 620%200 = 20
                    --           curTime = 2620
                    --           curTime - padMillis = 2600
                    local padMillis = math.fmod(intervalSinceLast, intervalPerTokens)
    
    
                    -- 将当前令牌桶更新到上一次生成时间
                    redis.call('hset', key, 'lastRefillTime', curTime - padMillis)
                end
    
    
                -- 更新当前令牌桶中的令牌数
                -- Math.min(根据时间生成的令牌数 + 剩下的令牌数, 桶的限制) => 超出桶最大令牌的就丢弃
                currentTokens = math.min(grantedTokens + tokensRemaining, bucketMaxTokens)
            end
        else
            -- 如果当前时间小于或等于上次更新的时间, 说明刚刚初始化, 当前令牌数量等于桶内令牌数
            -- 不需要重新填充
            currentTokens = tokensRemaining
        end
    end
    
    
    -- 如果当前桶内令牌小于 0,抛出异常
    assert(currentTokens >= 0)
    
    
    -- 如果当前令牌 == 0 ,更新桶内令牌, 返回 0
    if currentTokens == 0 then
        redis.call('hset', key, 'tokensRemaining', currentTokens)
        return 0
    else
        -- 如果当前令牌 大于 0, 更新当前桶内的令牌 -1 , 再返回当前桶内令牌数
        redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
        return currentTokens
    end
    复制代码
    

    其实这个脚本很简单,一个 key 拥有一个令牌桶,令牌桶是通过 Redis 中的 Hash 数据类型进行储存的。每个令牌桶拥有两个 field,分别是上一次填充时间lastRefillTime与当前桶内令牌数量tokensRemaining

    从脚本逻辑上来说,就分成了三个步骤,分别是:

    • 确认 key 的令牌桶是否存在,如果不存在就初始化。
    • 计算并更新当前令牌桶内的令牌数量:
    • 如果当前距离上次填充令牌的时间间隔超出重置时间,就重置令牌桶。
    • 计算距离上次填充的时间间隔是否超过了生产令牌的间隔时间,若大于间隔就计算生产了多少令牌与上次产生令牌的时间。
    • 若距离上次填充至今没有产生令牌就直接用。
    • 明确了当前桶内的令牌数之后,就判断是否放行:
    • 令牌等于 0,返回 0,不放行。
    • 令牌大于0,减少一个当前的桶内令牌,放行。

    限流器的模拟使用

    开启一个接口,模拟对接口并发调用。

    @PostMapping(value = "/test")
        public void testFlowControl(@RequestBody FlowControlConfig controlConfig) {
            Long apiId = controlConfig.getId();
            log.info("接收到 ApiId :{} 的请求", apiId);
            apiRequestCount.put(apiId, apiRequestCount.getOrDefault(apiId, 0) + 1);
            // 执行限流
            boolean res = flowControl.doFilter(controlConfig);
            if (res) {
                apiRequestFailedCount.put(apiId, apiRequestFailedCount.getOrDefault(apiId, 0) + 1);
            } else {
                apiRequestSuccessCount.put(apiId, apiRequestSuccessCount.getOrDefault(apiId, 0) + 1);
            }
            // 处理结果
            int totalCnt = apiRequestCount.get(apiId);
            int successCnt = apiRequestSuccessCount.get(apiId) == null ? 0 : apiRequestSuccessCount.get(apiId);
            int failedCnt = apiRequestFailedCount.get(apiId) == null ? 0 : apiRequestFailedCount.get(apiId);
            log.info(" ApiId :{} 的请求是否被限流:{} | 共请求{}次,放行{}次,限流{}次", apiId, res, totalCnt, successCnt, failedCnt);
        }
    复制代码
    

    后续业务拓展需要考虑的点

    • 弹性限流怎么做?平滑限流怎么做?
    • 关于api网关的调用的耗时比的思考?
    • 网关的性能计算,怎么计算 qps,怎么计算怎么抗多少?
    • 怎么合理估算API 的性能,并设置合适的限流大小?
    • 怎么根据业务场景选择合适的限流方案?

    redis淘汰策略

    常见的问题

    1、生产上你们的 redis 设置的内存多少?

    2、如果配置、修改 redis 内存的大小?

    3、如果内存满了你怎么办?

    4、redis 清理内存的方式?定期删除和惰性删除了解过吗?

    5、redis 缓存淘汰策略

    6、redis 的 lru 了解过吗?是否可以手写一个 lru 算法?

    redis 内存满了怎么办?

    redis 默认内存多少? 在哪里查看?如何设置和修改?

    1、查看 redis 最大占用内存?

    2、redis 默认内存多少可以用?

    如果不设置最大内存大小或者设置对打内存大小 0 , 在 64 位操作系统下不限制内存大小,在 32位操作系统下最多使用 3G 内存

    3、一般生产上你如何配置?

    一般推荐 reids 设置内存为最大物理内存的 3/4

    4、如何修改 redis 内存设置?

    通过修改文件配置

    通过命令修改

    5、什么命令查看 redis 内存使用情况?

    info memory

    如果要是 redis 的内存满会怎么样?如果redis 内存使用超过了最大设置会怎么样?

    1、如果要是 redis 的内存满会怎么样?如果redis 内存使用超过了最大设置会怎么样?

    如果 redis 内存被打满了,会提示 “(error) OOM command not allowed when used memory > 'maxmemory'.”

    结论

    • 设置了 maxmemory 的选项,加入 redis 内存使用达到了上限
    • 没有加上过期时间就会导致数据写满 maxmemory 为了避免这个问题,下面我们将在内存淘汰策略中详细阐述

    redis 缓存淘汰策略?

    1、往 redis 里面写了数据但是为什么会没了?

    redis 过期键的删除策略

    • 如果一个键是过期的,那它到了过期时间之后是不是马上就从内存中被删除了呢?
    • 肯定不是
    • 如果不是,那过期之后到底什么时候被删除?? 是一个什么操作

    三种不同的删除策略

    1、定时删除

    Redis 不可能时时刻刻遍历所有被设置了生存时间的key, 来检查数据是否已经到达过期时间,然后对他进行删除。

    立即删除能把整内存中数据最大的新鲜度,因为它保证过期键值会在过期后马上被删除,其所占的内存也会随之释放,但是立即删除对 CPU 是最不友好的。因为删除操作会占用 CPU 的时间,如果刚好碰到 CPU 很忙的时候,比如正在做交集或者排序等计算的时候,这个时候会给 CPU 造成额外的压力,让 CPU 心累,有时候需要删除。忙死。。。。

    这个时候会产生大量的性能消耗,同时也会影响数据的读取操作。

    总结:对 CPU 不友好,用处理器吸能换存储空间(拿时间换空间)。

    2、惰性删除

    数据到达过期时间,不做处理,等下次访问该数据时,

    如果未过期,返回数据:

    发现已过期,删除,返回不存在

    惰性删除策略的缺点是,它对内存最不友好的。

    如果一个键已经过期,而这个键仍保留在数据库中, 那么只要这个过期键不被删除,它所占用的内存就不会释放。

    在使用惰性删除策略时,如果数据库有非常多的过期键,而这这些过期键恰好🈶️没有被访问的话,那么他们也许永远不会被删除(除非用户手动执行 flushdb), 我们甚至可以将这种情况看作是一种内存泄漏 - 无用的垃圾数据占用了大量的内存,而服务器却不会自己去释放他们,这对于运行状态非常依赖与内存的 Redis 服务器来说,肯定不是一个好消息。

    总结:对 memory 不友好,用存储空间换处理器性能(拿空间换时间)

    3、上面两种方案都走极端

    定期删除

    定期删除策略是前面两种策略的择中

    定期删除策略每间隔一段时间执行一次删除过期键操作,并且通过限制删除操作执行的时间和频率来减少删除操作对 CPU 时间的影响。

    周期性轮询 redis 库中时效性数据,采用随机抽泣的策略,利用过期数据占比的方式控制删除频度

    特点1:CPU 性能占用设置有峰值, 检测频度可自己自定义设置。

    特点2: 内存压力不是很大,长期占用内存的冷数据会被持续清理

    总结:周期性抽查存储空间(随机抽查,重点抽查)

    举个例子:

    redis 默认每100ms 检查,是否有过期的 key, 如果有 key 则删除。注意: redis 不是间隔 100ms 将所有的 key 检查一次而是随机抽取进行检查(如果每间隔 100ms , 全部 key 进行检查,redis 直接进入 icu ). 如果只采用定期删除策略,会导致很多 key 到时间没有删除。

    定期删除策略的难点是确定删除操作执行的时长和频率。如果删除操作执行得太频繁,或者执行操的时间太长,定期删除策略就会退化成定时删除策略,以至于 CPU 时间过多的消耗在删除过期键上,如果删除操作执行得太少,或者珍惜i给你的时间太短,定期删除策略又会和惰性删除策略一样,出现浪费内存的情况。因为,如果采用定期删除策略的话,服务器必须根据情况,合理的删除操作执行时间长短和执行频率。

    总结: 定期抽样 key , 判断是否过期

    依旧有漏网之鱼

    上述步骤都通过了,还有漏洞吗?

    1、定期删除,从来没有被抽查到

    2、惰性删除,也从来没有被点中过

    上述2步骤 ==> 大量过期 key 堆积唉内存中,导致 redis 内存空间紧张或者很快耗尽。

    必须要要有一个更好的兜底方案 。。。。。

    内存淘汰策略

    redis.conf

    redis 的 8 种淘汰策略

    官方配置文件中的原文:

    volatile-lru -> Evict using approximated LRU among the keys with an expire set.
    allkeys-lru -> Evict any key using approximated LRU.
    volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
    allkeys-lfu -> Evict any key using approximated LFU.
    volatile-random -> Remove a random key among the ones with an expire set.
    allkeys-random -> Remove a random key, any key.
    volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
    noeviction -> Don't evict anything, just return an error on write operations.
    复制代码
    

    默认配置: noeviction

    中文总结

    1、noevication : 不会驱逐任何 key (默认)

    2、alkeys-lru: 对所有的 key 使用 lru 算法进行删除l

    3、volatile-lru: 对所有的设置了过期时间的 key 进行 lru 算法进行删除

    4、allkeys-random: 对所有 key 随机删除

    5、volatile-random: 对所有设置了过期时间的 key 随机删除

    6、volatile-ttl :马上删除要过期的 key

    7、allkeys-lfu: 对所有 key 进行 lfu 算法进行删除

    8、volatile-lfu: 对所有设置了过期时间的 key 使用 lfu 算法进行删除

    总结:

    • 2 * 4 = 8
    • 两个维度: 过期键中筛选;所有键中筛选
    • 4个方面: LRU ,LFU , random , ttl
    • 8 个选项

    你平时用那种?

    一般平时用 : allkeys-lru

    如何修改

    1、命令方式

    2、配置文件如何修改

    1、命令方式

    2、配置文件

    如何修改

    1、命令方式

    2、配置文件

    redis缓存问题

    定义

    缓存雪崩是指在短时间内,有大量缓存同时过期,导致大量的请求直接查询数据库,从而对数据库造成了巨大的压力,严重情况下可能会导致数据库宕机的情况叫做缓存雪崩。

    正常情况下执行过程:

    缓存雪崩下执行过程:

    可以看到,当缓存失效时,大量请求直接绕过 Redis 去请求数据库,导致会对数据库造成很大压力。

    解决

    加锁排队

    加锁排队可以起到缓冲的作用,防止大量的请求同时操作数据库,但它的缺点是增加了系统的响应时间,降低了系统的吞吐量,牺牲了一部分用户体验。

    思路:当缓存未查询到时,对要请求的 key 进行加锁,只允许一个线程去数据库中查,其他线程等候排队,这里的加锁逻辑就类似于单例模式的双重校验锁。

    代码实现:

    // 缓存 key
    String cacheKey = "userlist";
    // 查询缓存
    String data = jedis.get(cacheKey);
    if (StringUtils.isNotBlank(data)) {
        // 查询到数据,直接返回结果
        return data;
    } else {
        // 先排队查询数据库,再放入缓存
        synchronized (cacheKey) {
            data = jedis.get(cacheKey);
            if (!StringUtils.isNotBlank(data)) { // 双重判断
                // 查询数据库
                data = findUserInfo();
                // 放入缓存
                jedis.set(cacheKey, data);
            }
            return data;
        }
    }
    复制代码
    

    注意:如果是分布式架构,也就是服务集群,不能采用本地锁,必须得使用 Redis 的分布式锁。

    随机化过期时间

    为了避免缓存同时过期,可在设置缓存时添加随机时间,这样就可以极大的避免大量的缓存同时失效。

    代码实现:

    // 缓存原本的失效时间
    int exTime = 10 * 60;
    // 随机数生成类
    Random random = new Random();
    // 缓存设置
    jedis.setex(cacheKey, exTime + random.nextInt(1000) , value);
    复制代码
    

    设置二级缓存

    二级缓存指的是除了 Redis 本身的缓存,再设置一层缓存,当 Redis 失效之后,先去查询二级缓存。例如可以设置一个本地缓存,在 Redis 缓存失效的时候先去查询本地缓存而非查询数据库。本地缓存可以使用 GoogleGuava Cache 进行设置,并有容量驱逐、时间驱逐策略,很优秀的一个缓存工具类。

    Guava Cache 中文官方文档

    其实大部分情况下我们在项目中使用都是先访问本地缓存,然后再访问分布式缓存(Redis),因为访问本地缓存是最快的,没有网络开销,但是需要在一定的时间内进行更新,为了和分布式缓存中的数据保持一致。

    缓存穿透

    定义

    缓存穿透是指查询数据库和缓存都无数据,因为数据库查询无数据,出于容错考虑,不会将结果保存到缓存中,因此每次请求都会去查询数据库,这种情况就叫做缓存穿透。

    解决

    使用过滤器

    我们可以使用布隆过滤器来减少对数据库的请求,布隆过滤器的原理是将数据库的数据哈希到 bitmap 中,每次查询之前,先使用布隆过滤器过滤掉一定不存在的无效请求,从而避免了无效请求给数据库带来的查询压力。

    Redis 布隆过滤器

    缓存空结果

    我们可以把每次从数据库查询的数据都保存到缓存中,为了提高前台用户的使用体验 (解决长时间内查询不到任何信息的情况),我们可以将空结果的缓存时间设置得短一些,例如 3~5 分钟。

    缓存击穿

    定义

    缓存击穿指的是某个热点缓存,在某一时刻恰好失效了,然后此时刚好有大量的并发请求,此时这些请求将会给数据库造成巨大的压力,这种情况就叫做缓存击穿。

    解决

    加锁排队

    和缓存雪崩的加锁处理方式一致,再查数据库时进行加锁,缓冲大量请求。

    设置永不过期

    对于某些热点缓存,我们可以设置永不过期,这样就能保证缓存的稳定性,但需要注意在数据更改之后,要及时更新此热点缓存,不然就会造成查询结果的误差。

    缓存预热

    缓存预热并不是一个问题,而是使用缓存时的一个优化方案,它可以提高前台用户的使用体验。

    缓存预热指的是在系统启动的时候,先把查询结果预存到缓存中,以便用户后面查询时可以直接从缓存中读取,以节约用户的等待时间。

    缓存预热的实现思路有以下三种:

  • 把需要缓存的方法写在系统初始化的方法中,这样系统在启动的时候就会自动的加载数据并缓存数据。
  • 把需要缓存的方法挂载到某个页面或后端接口上,手动触发缓存预热。
  • 设置定时任务,定时自动进行缓存预热。
  • 布隆过滤器

    布隆过滤器是一个神奇的数据结构,可以用来判断一个元素是否在一个集合中。很常用的一个功能是用来去重。在爬虫中常见的一个需求:目标网站 URL 千千万,怎么判断某个 URL 爬虫是否宠幸过?简单点可以爬虫每采集过一个 URL,就把这个 URL 存入数据库中,每次一个新的 URL 过来就到数据库查询下是否访问过。

    select id from table where url = 'https://jaychen.cc'
    

    但是随着爬虫爬过的 URL 越来越多,每次请求前都要访问数据库一次,并且对于这种字符串的 SQL 查询效率并不高。除了数据库之外,使用 Redis 的 set 结构也可以满足这个需求,并且性能优于数据库。但是 Redis 也存在一个问题:耗费过多的内存。这个时候布隆过滤器就很横的出场了:这个问题让我来。

    相比于数据库和 Redis,使用布隆过滤器可以很好的避免性能和内存占用的问题。

    布隆过滤器本质是一个位数组,位数组就是数组的每个元素都只占用 1 bit 。每个元素只能是 0 或者 1。这样申请一个 10000 个元素的位数组只占用 10000 / 8 = 1250 B 的空间。布隆过滤器除了一个位数组,还有 K 个哈希函数。当一个元素加入布隆过滤器中的时候,会进行如下操作:

    • 使用 K 个哈希函数对元素值进行 K 次计算,得到 K 个哈希值。
    • 根据得到的哈希值,在位数组中把对应下标的值置为 1。

    举个🌰,假设布隆过滤器有 3 个哈希函数:f1, f2, f3 和一个位数组 arr。现在要把 https://jaychen.cc 插入布隆过滤器中:

    • 对值进行三次哈希计算,得到三个值 n1, n2, n3。
    • 把位数组中三个元素 arr[n1], arr[n2], arr[3] 置为 1。

    当要判断一个值是否在布隆过滤器中,对元素再次进行哈希计算,得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。

    Redis 中的布隆过滤器

    redis 在 4.0 的版本中加入了 module 功能,布隆过滤器可以通过 module 的形式添加到 redis 中,所以使用 redis 4.0 以上的版本可以通过加载 module 来使用 redis 中的布隆过滤器。但是这不是最简单的方式,使用 docker 可以直接在 redis 中体验布隆过滤器。

    > docker run -d -p 6379:6379 --name bloomfilter redislabs/rebloom
    
    > docker exec -it bloomfilter redis-cli
    

    edis 布隆过滤器主要就两个命令:

    • bf.add 添加元素到布隆过滤器中:bf.add urls https://jaychen.cc
    • bf.exists 判断某个元素是否在过滤器中:bf.exists urls https://jaychen.cc

    上面说过布隆过滤器存在误判的情况,在 redis 中有两个值决定布隆过滤器的准确率:

    • error_rate:允许布隆过滤器的错误率,这个值越低过滤器的位数组的大小越大,占用空间也就越大。
    • initial_size:布隆过滤器可以储存的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降。

    redis 中有一个命令可以来设置这两个值:

    bf.reserve urls 0.01 100
    复制代码
    

    三个参数的含义:

    • 第一个值是过滤器的名字。
    • 第二个值为 error_rate 的值。
    • 第三个值为 initial_size 的值。

    使用这个命令要注意一点:执行这个命令之前过滤器的名字应该不存在,如果执行之前就存在会报错:(error) ERR item exists

    相关文章

    服务器端口转发,带你了解服务器端口转发
    服务器开放端口,服务器开放端口的步骤
    产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
    如何使用 WinGet 下载 Microsoft Store 应用
    百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
    百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

    发布评论