手动撸一个 Redis 分布式锁

2024年 2月 19日 44.2k 0

大家好呀,我是楼仔。

今天第一天开工,收拾心情,又要开始好好学习,好好工作了。

对于使用 Java 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 Redisson 就行。

但是因为这些封装好的组建,让我们越来越懒。

我们使用一些封装好的开源组建时,可以了解其中的原理,或者自己动手写一个,可以更好提升你的技术水平。

今天我就教大家用原生的 Redis,手动撸一个 Redis 分布式锁,很有意思。

01 问题引入

其实通过 Redis 实现分布式锁,经常会有面试官会问,很多同学都知道用 SetNx() 去获取锁,解决并发问题。

SetNx() 是什么?我简单解答一下。

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

对于下面 2 种问题,你知道如何解决么?

  • 如果获取锁的机器挂掉,如何处理?
  • 当锁超时时,A、B 两个线程同时获取锁,可能导致锁被同时获取,如何解决?

这个就是我们实现 Redis 分布式锁时,需要重点解决的 2 个问题。

02 理论知识

刚才说过,通过 SetNx() 去获取锁,可以解决并发问题。

当获取到锁,处理完业务逻辑后,会将锁释放。

图片图片

但当机器宕机,或者重启时,没有执行 Del() 删除锁操作,会导致锁一直没有释放。

所以,我们还需要记录锁的超时时间,判断锁是否超时。

图片图片

这里我们通过 GetKey() 获取锁的超时时间 A,通过和当前时间比较,判断锁是否超时。

如果锁未超时,直接返回,如果锁超时,重新设置锁的超时时间,成功获取锁。

还有其它问题么?当然!

因为在并发场景下,会存在 A、B 两个线程同时执行 SetNx(),导致两个线程同时获取到锁。

那如何解决呢?将 SetNx() 用 GetSet() 替换。

图片图片

GetSet() 是什么?我简单解答一下。

Redis Getset 命令用于设置指定 key 的值,并返回 key 的旧值。

这里不太好理解,我举个例子。

假如 A、B 两个线程,A 先执行,B 后执行:

  • 对于线程 A 和 B,通过 GetKey 获取的超时时间都是 T1 = 100;
  • 对于线程 A,将超时时间 Ta = 200 通过 GetSet() 设置,返回 T2 = 100,此时满足条件 “T1 == T2”,获取锁成功;
  • 对于线程 B,将超时时间 Tb = 201 通过 GetSet() 设置,由于锁超时时间已经被 A 重新设置,所以返回 T2 = 200,此时不满足条件 “T1 == T2”,获取锁失败。

可能有同学会继续问,之前设置的超时是 Ta = 200,现在变成了 Tb = 201,延长或缩短了锁的超时时间,不会有问题么?

其实在现实并发场景中,能走到这一步,基本是“同时”进来的,两者的时间差非常小,可以忽略此影响。

03 代码实战

这里给出 Go 代码,注释都写得非常详细,即使你不会 Go,读注释也能读懂。

// 获取分布式锁,需要考虑以下情况:
// 1. 机器A获取到锁,但是在未释放锁之前,机器挂掉或者重启,会导致其它机器全部hang住,这时需要根据锁的超时时间,判断该锁是否需要重置;
// 2. 当锁超时时,需要考虑两台机器同时去获取该锁,需要通过GETSET方法,让先执行该方法的机器获取锁,另外一台继续等待。
func GetDistributeLock(key string, expireTime int64) bool {

 currentTime := time.Now().Unix()
 expires := currentTime + expireTime
 redisAlias := "jointly"

 // 1.获取锁,并将value值设置为锁的超时时间
 redisRet, err := redis.SetNx(redisAlias, key, expires)
 if nil == err && utils.MustInt64(1) == redisRet {
  // 成功获取到锁
  return true
 }

 // 2.当获取到锁的机器突然重启&挂掉时,就需要判断锁的超时时间,如果锁超时,新的机器可以重新获取锁
 // 2.1 获取锁的超时时间
 currentLockTime, err := redis.GetKey(redisAlias, key)
 if err != nil {
  return false
 }

 // 2.2 当"锁的超时时间"大于等于"当前时间",证明锁未超时,直接返回
 if utils.MustInt64(currentLockTime) >= currentTime {
  return false
 }

 // 2.3 将最新的超时时间,更新到锁的value值,并返回旧的锁的超时时间
 oldLockTime, err := redis.GetSet(redisAlias, key, expires)
 if err != nil {
  return false
 }

 // 2.4 当锁的两个"旧的超时时间"相等时,证明之前没有其它机器进行GetSet操作,成功获取锁
 // 说明:这里存在并发情况,如果有A和B同时竞争,A会先GetSet,当B再去GetSet时,oldLockTime就等于A设置的超时时间
 if utils.MustString(oldLockTime) == currentLockTime {
  return true
 }
 return false
}

删除锁逻辑:

// 删除分布式锁
// @return bool true-删除成功;false-删除失败
func DelDistributeLock(key string) bool {
 redisAlias := "jointly"
 redisRet := redis.Del(redisAlias, key)
 if redisRet != nil {
  return false
 }
 return true
}

业务逻辑:

func DoProcess(processId int) {

 fmt.Printf("启动第%d个线程n", processId)

 redisKey := "redis_lock_key"
 for {
  // 获取分布式锁
  isGetLock := GetDistributeLock(redisKey, 10)
  if isGetLock {
   fmt.Printf("Get Redis Key Success, id:%dn", processId)
   time.Sleep(time.Second * 3)
   // 删除分布式锁
   DelDistributeLock(redisKey)
  } else {
   // 如果未获取到该锁,为了避免redis负载过高,先睡一会
   time.Sleep(time.Second * 1)
  }
 }
}

最后起个 10 个多线程,去执行这个 DoProcess():

func main() {
// 初始化资源
var group string = "group"
var name string = "name"
var host string

// 初始化资源
host = "http://ip:port"
_, err := xrpc.NewXRpcDefault(group, name, host)
if err != nil {
panic(fmt.Sprintf("initRpc when init rpc failed, err:%v", err))
}
redis.SetRedis("louzai", "redis_louzai")

// 开启10个线程,去抢Redis分布式锁
for i := 0; i

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论