Springboot+Redisson封装分布式锁Starter

2023年 8月 27日 31.9k 0

1、为什么要使用分布式锁?

在分布式,微服务环境中,我们的服务被拆分为很多个,并且每一个服务可能存在多个实例,部署在不同的服务器上。

此时JVM中的synchronized和lock锁,将只能对自己所在服务的JVM加锁,而跨机器,跨JMV的场景,仍然需要锁的场景就需要使用到分布式锁了。

2、为什么要使用Redis实现分布式锁?

因为Redis的性能很好,并且Redis是单线程的,天生线程安全。

并且Redis的key过期效果与Zookeeper的临时节点的效果相似,都能实现锁超时自动释放的功能。

而且Redis还可以使用lua脚本来保证redis多条命令实现整体的原子性,Redisson就是使用lua脚本的原子性来实现分布式锁的。

3、我们如何基于Redisson封装分布式锁?

1)、基于RedissonClient实现手动加锁

2)、基于AOP+Redisson封装注解版的分布式锁

3)、将分布式锁功能封装成一个starter, 引入jar包即可实现分布式锁

4、代码实现

4.1、整合封装Redisson

我们前面封装了基于Redis扩展了SpringCache,封装了redis-cache-spring-boot-starter。

我们的分布式锁基于这个模块实现,下面引入依赖。

引入依赖



    
        itdl-parent
        com.itdl
        1.0
    
    4.0.0

    redis-lock-spring-boot-starter
    Redis实现分布式锁的自定义starter封装模块

    
        ${java.version}
        ${java.version}
    

    
        
        
            com.itdl
            redis-cache-spring-boot-starter
        

        
        
            org.redisson
            redisson
        

        
        
            org.springframework.boot
            spring-boot-starter-aop
        
    

编写RedisLockConfig配置RedissonClient

/**
 * Redis实现分布式锁的配置(使用Redisson)
 */
@Configuration  // 标识为一个配置项,注入Spring容器
@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class})
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")    // 开启redis.enable=true时生效
@Slf4j
public class RedisLockConfig {

    private volatile boolean isCluster = false;

    private volatile String redisHostsStr = "";

    @Bean
    @ConditionalOnMissingBean
    public RedissonClient redissonClient(CustomRedisProperties redisProperties){
        // 构建配置
        Config config =  buildConfig(redisProperties);

        RedissonClient redissonClient = Redisson.create(config);
        log.info("==============创建redisClient{}版成功:{}==================", isCluster ? "集群": "单机", redisHostsStr);
        return redissonClient;
    }

    private Config buildConfig(CustomRedisProperties redisProperties) {
        final Config config = new Config();
        // 根据逗号切割host列表
        Set hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost());
        if (CollectionUtils.isEmpty(hosts)){
            throw new RuntimeException("redis host address cannot be empty");
        }
        // 只有一个host, 表示是单机host
        if (hosts.size() == 1){
            String hostPort = hosts.stream().findFirst().get();
            redisHostsStr = "redis://" + hostPort.trim();
            config.useSingleServer()
                    .setAddress(redisHostsStr)
                    .setDatabase(redisProperties.getDatabase())
                    .setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())
            ;
            isCluster = false;
        }else {
            // 集群处理
            String[] redisHosts = new String[hosts.size()];
            int i = 0;
            for (String host : hosts) {
                String[] split = host.split(":");
                if (split.length != 2){
                    throw new RuntimeException("host or port err");
                }
                redisHosts[i] = "redis://" + host.trim();
                i++;
            }
            redisHostsStr = String.join(",", redisHosts);
            // 配置集群
            config.useClusterServers()
                    .addNodeAddress(redisHosts)
                    .setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())
                    // 解决Not all slots covered! Only 10922 slots are available
                    .setCheckSlotsCoverage(false);
            isCluster = true;
        }

        return config;
    }
}

我们配置时需要优先配置好redis-cache-spring-boot-starter,使用@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class})

直接使用,不再重复造轮子,然后我们根据自定义属性配置文件CustomRedisProperties来创建RedissonClient的Bean。

编写META-INF/spring.factories进行自动配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.itdl.lock.config.RedisLockConfig

在测试模块缓存service添加分布式锁

@Cacheable(cacheNames = "demo2#3", key = "#id")
public TestEntity getById2(Long id){
    // 创建分布式锁
    RLock lock = redissonClient.getLock("demo2_lock");
    // 加锁    
    lock.lock(10, TimeUnit.SECONDS);
    if (id > 1000){
        log.info("id={}没有查询到数据,返回空值", id);
        return null;
    }
    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
    log.info("模拟查询数据库:{}", testEntity);
    // 释放锁
    lock.unlock();
    return testEntity;
}

我们这里的@Cacheable没有加sync=true, 此时并发请求会存在线程安全问题,但是我们在方法体局部添加了分布式锁,因此我们的程序会按照顺序执行。

如果我们的参数被定死了,最终请求会被先存储到缓存,所以后续的查询就会走缓存,这能很好的测试分布式锁的效果。

编写测试程序

@SpringBootTest
public class TestRedisLockRunner6 {

    @Autowired
    private MyTestService myTestService;

    // 创建一个固定线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(16);

    /**
     * 多线程访问请求,测试切面的线程安全性
     */
    @Test
    public void testMultiMyTestService() throws InterruptedException {
        for (int i = 0; i  {
                // 每次查询同一个参数
                TestEntity t1 = myTestService.getById2(1L);
            });
        }

        // 主线程休息10秒种
        Thread.sleep(10000);
    }

}

我们可以看到,结果并没有符合我们预期,但是又部分符合我们预期,为什么呢?

因为我们的@Cacheable是存在线程安全问题的,因为它先查询缓存这个操作存在并发问题,查询时就同时有N个请求进入@Cacheable, 并且都查询没有缓存。

然后同时执行方法体,但方法体加了分布式锁,所以排队进行处理,因此序号有序。

但打印数量不足总数,是因为这一批次没有全部到达@Cacheable,而是执行完毕之后才将缓存回填,所以后续的请求就是走缓存了。

解决方案:我们加上sync=true之后就能实现,只查询一次数据库,就可以回填缓存了。如果我们去掉@Cacheable注解,则会每一次都查询数据库,但是时按照顺序执行的。

加上sync=true测试

效果达到了我们的预期,继续看一下去掉@Cacheable注解的情况。

去掉@Cacheable注解测试

我们的分布式锁功能是没有问题的,但是每次我们都需要执行getLock(), lock.lock(), lock.unlock(),是不是很麻烦,能不能一个注解搞定?

当然是可以的。

4.2、封装注解版分布式锁

编写@RedisLock注解

/**
 * 自定义Redis分布式锁
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {

    /**分布式锁的名称,支持el表达式*/
    String lockName() default "";

    /**锁类型 默认为可重入锁*/
    LockType lockType() default REENTRANT_LOCK;

    /**获取锁等待时间,默认60秒*/
    long waitTime() default 30000L;

    /** 锁自动释放时间,默认60秒*/
    long leaseTime() default 60000L;

    /**
     * 被加锁方法执行完是否立即释放锁
     */
    boolean immediatelyUnLock() default true;

    /** 时间单位, 默认毫秒*/
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

编写分布式锁切面RedisLockAop

/**
 * Redis分布式锁的切面逻辑实现
 */
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")    // 开启redis.enable=true时生效
@AutoConfigureBefore(RedisLockConfig.class)
@Aspect
@Configuration
@Slf4j
public class RedisLockAop {

    @Resource
    private RedissonClient redissonClient;

    /**
     * 切点
     */
    @Pointcut("@annotation(com.itdl.lock.anno.RedisLock)")
    public void pointcut(){

    }
    
    /**
     * 环绕通知 注解针对的是方法,这里切点也获取方法进行处理就可以了
     */
    @Around("pointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        // 获取方法
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();

        // 获取方法上的分布式锁注解
        RedisLock redisLock = method.getDeclaredAnnotation(RedisLock.class);

        // 获取注解的参数
        // 锁名称
        String lockName = redisLock.lockName();
        // 锁类型
        LockType lockType = redisLock.lockType();

        // 获取RedissonClient的Lock
        RLock lock = getRLock(lockName, lockType, redisLock);

        //获取到锁后, 开始执行方法,执行完毕后释放锁
        try {
            log.debug("=========>>>获取锁成功, 即将执行业务逻辑:{}", lockName);
            Object proceed = joinPoint.proceed();
            // 释放锁
            if (redisLock.immediatelyUnLock()) {
                //是否立即释放锁
                lock.unlock();
            }
            log.debug("=========>>>获取锁成功且执行业务逻辑成功:{}", lockName);
            return proceed;
        } catch (Exception e) {
            log.error("=========>>>获取锁成功但执行业务逻辑失败:{}", lockName);
            e.printStackTrace();
            throw new RedisLockException(LockErrCode.EXEC_BUSINESS_ERR);
        }finally {
            // 查询当前线程是否保持此锁定 被锁定则解锁
            lock.unlock();
            log.debug("=========>>>释放锁成功:{}", lockName);
        }
    }

    /**
     * 根据锁名称和类型创建锁
     * @param lockName 锁名称
     * @param lockType 锁类型
     * @return 锁
     */
    private RLock getRLock(String lockName, LockType lockType, RedisLock redisLock) throws InterruptedException {
        RLock lock;
        switch (lockType){
            case FAIR_LOCK:
                lock = redissonClient.getFairLock(lockName);
                break;
            case READ_LOCK:
                lock = redissonClient.getReadWriteLock(lockName).readLock();
                break;
            case WRITE_LOCK:
                lock = redissonClient.getReadWriteLock(lockName).writeLock();
                break;
            default:
                // 默认加可重入锁,也就是普通的分布式锁
                lock = redissonClient.getLock(lockName);
                break;
        }
        // 首先尝试获取锁,如果在规定时间内没有获取到锁,则调用lock等待锁,直到获取锁为止
        if (lock.tryLock()) {
            lock.tryLock(redisLock.waitTime(), redisLock.leaseTime(), redisLock.timeUnit());
        }else {
            // 如果leaseTime>0,规定时间内获取锁,超时则自动释放锁
            long leaseTime = redisLock.leaseTime();
            if (leaseTime > 0) {
                lock.lock(redisLock.leaseTime(), redisLock.timeUnit());
            } else {
                // 自动释放锁时间设置为0或者负数,则加锁不设置超时时间
                lock.lock();
            }
        }
        return lock;
    }
}

话不多说,封装的逻辑已经在注释中写的很清晰了。

将切面也放入自动配置spring.factories中

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.itdl.lock.config.RedisLockConfig,
com.itdl.lock.anno.RedisLockAop

测试注解版分布式锁

@RedisLock(lockName = "demo4_lock")
public TestEntity getById4(Long id) throws InterruptedException {
    index++;
    log.info("current index is : {}", index);
    Thread.sleep(new Random().nextInt(10) * 100);
    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
    log.info("模拟查询数据库:{}", testEntity);
    return testEntity;
}

可以看到,我们就是一个注解分布式锁的效果,而分布式锁与缓存注解通常不会一起使用,因为一般会在存在事务问题的地方我们会使用锁,在多个JMV操作同一条数据做写操作时需要加分布式锁。

编写测试程序

@SpringBootTest
public class TestRedisLockRunner6 {

    @Autowired
    private MyTestService myTestService;

    // 创建一个固定线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(16);

    /**
     * 多线程访问请求,测试切面的线程安全性
     */
    @Test
    public void testMultiMyTestService() throws InterruptedException {
        for (int i = 0; i  {
                try {
                    TestEntity t1 = myTestService.getById4(1L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 主线程休息10秒种
        Thread.sleep(60000);
    }

}

测试结果

5、小结

我们将分布式锁基于缓存扩展了一版,也就是说本starter即有分布式缓存功能,又有分布式锁功能。

而注解版的分布式锁能够解决大多数场景的并核问题,小粒度的Lock锁方式补全其他场景。

将两者封装成为一个starter,我们就可以很方便的使用分布式锁功能,引入相关包即可,开箱即用。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论