纠正误区:这才是 SpringBoot Redis 分布式锁的正确实现方式

2024年 1月 24日 85.3k 0

我是码哥,可以叫我靓仔。

在说分布式锁之前,我们先说下为什么需要分布式锁。

在单机部署的时候,我们可以使用 Java 中提供的 JUC 锁机制避免多线程同时操作一个共享变量产生的安全问题。JUC 锁机制只能保证同一个 JVM 进程中的同一时刻只有一个线程操作共享资源。

一个应用部署多个节点,多个进程如果要修改同一个共享资源,为了避免操作乱序导致的并发安全问题,这个时候就需要引入分布式锁,分布式锁就是用来控制同一时刻,只有一个 JVM 进程中的一个线程可以访问被保护的资源。

分布式锁很重要,然而很多公司的系统可能还在跑着有缺陷的分布式锁方案,其中不乏一些大型公司。

所以,码哥今天分享一个正确 Redis 分布式锁代码实战,让你一飞冲天,该代码可直接用于生产,不是简单的 demo。

温馨提示:如果你只想看代码实战部分,可直接翻到 SpringBoot 实战章节。

错误的分布式锁

说正确方案之前,先来一个错误的,知道错在哪,才能意识到如何写正确。

在银行工作的小白老师,使用 Redis SET 指令实现加锁, 指令满足了当 key 不存在则设置 value,同时设置超时时间,并且满足原子语意。

SET lockKey 1 NX PX expireTime
  • lockKey 表示锁的资源,value 设置成 1。
  • NX:表示只有 lockKey 不存在的时候才能 SET 成功,从而保证只有一个客户端可以获得锁。
  • PX expireTime设置锁的超时时间,单位是毫秒;也可以使用 EX seconds以秒为单位设置超时时间。

至于解锁操作,小白老师果决的使用 DEL指令删除。一个分布式锁方案出来了,一气呵成,组员不明觉厉,纷纷竖起大拇指,伪代码如下。

//加锁成功
if(jedis.set(lockKey, 1, "NX", "EX", 10) == 1){
  try {
      do work //执行业务

  } finally {
    //释放锁
     jedis.del(key);
  }
}

然而,这是一个错误的分布式锁。问题在于解锁的操作有可能出现释放别人的锁的情况。

有可能出现释放别人的锁的情况。

  • 客户端 A 获取锁成功,设置超时时间 10 秒。
  • 客户端 A 执行业务逻辑,但是因为某些原因(网络问题、FullGC、代码垃圾性能差)执行很慢,时间超过 10 秒,锁因为超时自动释放了。
  • 客户端 B 加锁成功。
  • 客户端 A 执行 DEL 释放锁,相当于把客户端 B 的锁释放了。

原因很简单:客户端加锁时,没有设置一个唯一标识。释放锁的逻辑并不会检查这把锁的归属,直接删除。

残血版分布式锁

小白老师:“码哥,怎么解决释放别人的锁的情况呢?”

解决方法:客户端加锁时设置一个“唯一标识”,可以让 value 存储客户端的唯一标识,比如随机数、 UUID 等;释放锁时判断锁的唯一标识与客户端的标识是否匹配,匹配才能删除。

加锁

SET lockKey randomValue NX PX 3000

解锁

删除锁的时候判断唯一标识是否匹配伪代码如下。

if (jedis.get(lockKey).equals(randomValue)) {
    jedis.del(lockKey);
}

加锁、解锁的伪代码如下所示。

try (Jedis jedis = pool.getResource()) {
  //加锁成功
  if(jedis.set(lockKey, randomValue, "NX", "PX", 3000) == 1){
    do work //执行业务
  }
} finally {
  //判断是不是当前线程加的锁,是才释放
  if (randomValue.equals(jedis.get(keylockKey {
     jedis.del(lockKey); //释放锁
  }
}

到这里,很多公司可能都是使用这个方式来实现分布式锁。

小白:“码哥,还有问题。判断锁的唯一标识是否与当前客户端匹配和删除操作不是原子操作。”

聪明。这个方案还存在原子性问题,存在其他客户端把锁给释放的问题。

  • 客户端 A 执行唯一标识匹配成功,还来不及执行DEL释放锁操作,锁过期被释放。
  • 客户端 B 获取锁成功,value 设置了自己的客户端唯一标识。
  • 客户端 A 继续执行 DEL删除锁操作,相当于把客户端 B 的锁给删了。

青铜版分布式锁

虽然叫青铜版,这也是我们最常用的分布式锁方案之一了,这个版本没有太大的硬伤,并且比较简单。

小白老师:“码哥,这如何是好,如何解决解锁不是原子操作的问题?分布式锁这么多门道,是我肤浅了。”

解决方案很简单,解锁的逻辑我们可以通过 Lua 脚本来实现判断和删除的过程。

  • KEYS[1]是 lockKey。
  • ARGV[1] 表示客户端的唯一标识 requestId。

返回 nil 表示锁不存在,已经被删除了。只有返回值是 1 才表示加锁成功。

// key 不存在,返回 null
if (redis.call('exists', KEYS[1]) == 0) then
   return nil;
end;
// 获取 KEY[1] 中的 value 与 ARGV[1] 匹配,匹配则 del,返回 1。不匹配 return 0 解锁失败
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1]);
else
    return 0;
end;

使用上面的脚本,每个锁都用一个随机值作为唯一标识,当删除锁的客户端的“唯一标识”与锁的 value 匹配的时候,才能执行删除操作。这个方案已经相对完美,我们用的最多的可能就是这个方案了。

理论知识学完了,上实战。

Spring Boot 环境准备

接下来码哥,给你一个基于 Spring Boot 并且能用于生产实战的代码。在上实战代码之前,先把 Spring Boot 集成 Redis 的环境搞定。

添加依赖

代码基于 Spring Boot 2.7.18 ,使用 lettuce 客户端来操作 Redis。添加 spring-boot-starter-data-redis依赖。


    
        
            
            org.springframework.boot
            spring-boot-dependencies
            2.7.18
            pom
            import
        
        
            org.projectlombok
            lombok
            1.18.28
            true
        
    



    
    
        org.springframework.boot
        spring-boot-starter-data-redis
    

    
    
        com.fasterxml.jackson.core
        jackson-databind
    
    
        org.projectlombok
        lombok
        true
    
    
        org.springframework.boot
        spring-boot-starter-test
        test
    

SpringBoot 配置

先配置 yaml。

server:
  servlet:
    context-path: /redis
  port: 9011
spring:
  application:
    name: redis
  redis:
    host: 127.0.0.1
    port: 6379
    password: magebyte
    timeout: 6000
    client-type: lettuce
    lettuce:
      pool:
        max-active: 300
        max-idle: 100
        max-wait: 1000ms
        min-idle: 5

RedisTemplate 默认序列化方式不具备可读性,我们改下配置,使用 JSON 序列化。注意了,这一步是附加操作,与分布式锁没有关系,是码哥顺带给你的彩蛋。

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(connectionFactory);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        redisTemplate.setKeySerializer(stringRedisSerializer); // key的序列化类型

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 方法过期,改为下面代码
//        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
                ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value的序列化类型
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

把分布式锁接口定义出来,所谓面向接口和对象编程,代码如有神。顺带用英文显摆下什么叫做专业。

/**
 * 分布式锁
 */
public interface Lock {

    /**
     * Tries to acquire the lock with defined leaseTime.
     * Waits up to defined waitTime if necessary until the lock became available.
     * 

* Lock will be released automatically after defined leaseTime interval. * * @param waitTime the maximum time to acquire the lock * @param leaseTime lease time * @param unit time unit * @return true if lock is successfully acquired, * otherwise false if lock is already set. * @throws InterruptedException - if the thread is interrupted */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /** * Acquires the lock with defined leaseTime. * Waits if necessary until lock became available. *

* Lock will be released automatically after defined leaseTime interval. * * @param leaseTime the maximum time to hold the lock after it's acquisition, * if it hasn't already been released by invoking unlock. * If leaseTime is -1, hold the lock until explicitly unlocked. * @param unit the time unit */ void lock(long leaseTime, TimeUnit unit); /** * Releases the lock. * *

Implementation Considerations * *

A {@code Lock} implementation will usually impose * restrictions on which thread can release a lock (typically only the * holder of the lock can release it) and may throw * an (unchecked) exception if the restriction is violated. * Any restrictions and the exception * type must be documented by that {@code Lock} implementation. */ void unlock(); }

青铜分布式锁实战

DistributedLock 实现 Lock 接口,构造方法实现 resourceName 和 StringRedisTemplate 的属性设置。客户端唯一标识使用uuid:threadId 组成。

DistributedLock

public class DistributedLock implements Lock {

    /**
     * 标识 id
     */
    private final String id = UUID.randomUUID().toString();

    /**
     * 资源名称
     */
    private final String resourceName;

    private final List keys = new ArrayList(1);


    /**
     * redis 客户端
     */
    private final StringRedisTemplate redisTemplate;

    public DistributedLock(String resourceName, StringRedisTemplate redisTemplate) {
        this.resourceName = resourceName;
        this.redisTemplate = redisTemplate;
        keys.add(resourceName);
    }

    private String getRequestId(long threadId) {
        return id + ":" + threadId;
    }
}

加锁 tryLock、lock

tryLock 以阻塞等待 waitTime 时间的方式来尝试获取锁。获取成功则返回 true,反之 false。tryAcquire 方法相当于执行了 Redis 的SET resourceName uuid:threadID NX PX {leaseTime} 指令。

与 tryLock不同的是, lock 一直尝试自旋阻塞等待获取分布式锁,直到获取成功为止。而 tryLock 只会阻塞等待 waitTime 时间。

此外,为了让程序更加健壮,码哥实现了阻塞等待获取分布式锁,让你用的更加开心,面试不慌加薪不难。如果你不需要自旋阻塞等待获取锁,那把 while 代码块删除即可。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean isAcquire = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (Boolean.TRUE.equals(isAcquire)) {
return true;
}

time -= System.currentTimeMillis() - current;
// 等待时间用完,获取锁失败
if (time

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论