redisson分布式锁实现原理

2023年 7月 25日 51.3k 0

一、基于redisson实现分布式锁使用

Redisson是一个使用Java编写的开源库,它提供了对Redis数据库的访问和操作的封装,并在此基础上提供了各种分布式功能,包括分布式锁。
Redisson的分布式锁是基于Redis的原子性操作来实现的,它提供了简单且易于使用的API,可以在分布式环境中实现高效的分布式锁管理。

1.引入依赖

引入redis和redisson相关依赖:


    org.springframework.boot
    spring-boot-starter-data-redis


    org.redisson
    redisson-spring-boot-starter

2.编写配置

编写声明RedissonClient配置,server类型可以选ClusterServers,MasterSlaveServers,ReplicatedServers,SentinelServers和SingleServer,此处使用的server类型选单体redis:

@Configuration
public class RedissonConfiguration {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password:}")
    private String password;
    private int database = 0;
    @Bean
    public RedissonClient redisson() {
        String address = "redis://" + host + ":" + port;
        Config config = new Config();
        config.useSingleServer()
                .setAddress(address)
                .setPassword(password)
                .setDatabase(database);
        return Redisson.create(config);
    }
}

3.使用分布式锁

注入RedissonClient然后获取锁,加锁后进行独占业务操作,最后释放锁。

@Service
@Slf4j
public class TestRLock {
    @Resource
    private RedissonClient redissonClient;

    public void doSomething(String orderId) {
        RLock lock = redissonClient.getLock("place_order:" + orderId);
        try {
            if(lock.tryLock(5,10, TimeUnit.SECONDS)) {
                this.doBuzzExclusive(orderId);
            }
        } catch (Exception e) {
            log.error("occur error;orderId={}",orderId,e);
        } finally {
            //锁被持有,并且被当前线程持有
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
            } 
        }
    }
    /**
     * 模拟独占处理
     * @param orderId
     */
    private void doBuzzExclusive(String orderId) {
        // TODO: do business exclusive
    }
}

这样就实现分布式锁对竞态资源的操作控制。

二、redisson分布式锁原理

1.建立连接

在Redisson中,Netty被用作底层的网络通信框架。它提供了高性能、异步非阻塞的网络通信能力,使得Redisson可以与Redis服务器进行快速、可靠的通信。
在使用Redisson创建RedissonClient实例时,它会自动初始化并启动Netty客户端,用于与Redis服务器建立连接。
从前边的分布式锁使用过程可以看出,RLock是由RedissonClient创建,那么与redis的连接交互也是由RedissonClient来实现,我们从创建RedissonClient过程看一下redisson如何与redis建立连接的。

public static RedissonClient create(Config config) {
    return new Redisson(config);
}

然后调用Redisson构造函数创建:

protected Redisson(Config config) {
    this.config = config;
    Config configCopy = new Config(config);

    connectionManager = ConfigSupport.createConnectionManager(configCopy);
    RedissonObjectBuilder objectBuilder = null;
    if (config.isReferenceEnabled()) {
        objectBuilder = new RedissonObjectBuilder(this);
    }
    commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
    evictionScheduler = new EvictionScheduler(commandExecutor);
    writeBehindService = new WriteBehindService(commandExecutor);
}

这里会复制一份配置出来,然后创建连接管理器、命令执行器、定期定出调度、以及异步写服务。
此处主要关注命令执行器和连接管理器,此处用的是同步命令执行器,当然也有其他实现比如CommandBatchService批量执行器。
CommandSyncService.png
然后再看下创建连接管理器:

public static ConnectionManager createConnectionManager(Config configCopy) {
    UUID id = UUID.randomUUID();
    if (configCopy.getMasterSlaveServersConfig() != null) {
        validate(configCopy.getMasterSlaveServersConfig());
        return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
    } else if (configCopy.getSingleServerConfig() != null) {
        validate(configCopy.getSingleServerConfig());
        return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
    } else if (configCopy.getSentinelServersConfig() != null) {
        validate(configCopy.getSentinelServersConfig());
        return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
    } else if (configCopy.getClusterServersConfig() != null) {
        validate(configCopy.getClusterServersConfig());
        return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
    } else if (configCopy.getReplicatedServersConfig() != null) {
        validate(configCopy.getReplicatedServersConfig());
        return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
    } else if (configCopy.getConnectionManager() != null) {
        return configCopy.getConnectionManager();
    }else {
        throw new IllegalArgumentException("server(s) address(es) not defined!");
    }
}

前边使用的是SingleServer,看一下SingleConnectionManager创建流程:

public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
    super(create(cfg), config, id);
}

SingleConnectionManager.png
SingleConnectionManager继承了MasterSlaveConnectionManager,会调用父类构造器:

public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
    this(config, id);
    this.config = cfg;

    if (cfg.getSlaveAddresses().isEmpty()
            && (cfg.getReadMode() == ReadMode.SLAVE || cfg.getReadMode() == ReadMode.MASTER_SLAVE)) {
        throw new IllegalArgumentException("Slaves aren't defined. readMode can't be SLAVE or MASTER_SLAVE");
    }

    initTimer(cfg);
    initSingleEntry();
}

initTimer会创建空闲连接监听管理以及发布订阅管理器,然后调用initSingleEntry初始化单机客户端。

protected void initSingleEntry() {
    try {
        if (config.checkSkipSlavesInit()) {
            masterSlaveEntry = new SingleEntry(this, config);
        } else {
            masterSlaveEntry = new MasterSlaveEntry(this, config);
        }
        CompletableFuture masterFuture = masterSlaveEntry.setupMasterEntry(new RedisURI(config.getMasterAddress()));
        masterFuture.join();

        //省略...
        startDNSMonitoring(masterFuture.getNow(null));
    } catch (Exception e) {
        //省略...
    }
}

创建SingleEntry,然后调用setupMasterEntry方法设置主节点连接,并且会调用startDNSMonitoring方法开启线程监听ip是否发生变成,如果变成会重新连接。继续看setupMasterEntry方法:

public CompletableFuture setupMasterEntry(RedisURI address, String sslHostname) {
    RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
    return setupMasterEntry(client);
}

此处会先创建redis客户端,然后调用setupMasterEntry方法设置主节点连接;先看一下创建RedisClient:

private RedisClient(RedisClientConfig config) {
    RedisClientConfig copy = new RedisClientConfig(config);
    //省略...
    channels = new DefaultChannelGroup(copy.getGroup().next());
    bootstrap = createBootstrap(copy, Type.PLAIN);
    pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
    //省略...
}

这里基本上都是netty启动器的相关设置和前置准备,可以看一下创建netty客户端启动器的操作:

private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
    Bootstrap bootstrap = new Bootstrap()
                    .resolver(config.getResolverGroup())
                    .channel(config.getSocketChannelClass())
                    .group(config.getGroup());

    bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
    bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
    bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
    config.getNettyHook().afterBoostrapInitialization(bootstrap);
    return bootstrap;
}

然后继续看setupMasterEntry做了什么事情.

private CompletableFuture setupMasterEntry(RedisClient client) {
    CompletableFuture addrFuture = client.resolveAddr();
    return addrFuture.thenCompose(res -> {
        masterEntry = new ClientConnectionsEntry(
                client,
                config.getMasterConnectionMinimumIdleSize(),
                config.getMasterConnectionPoolSize(),
                config.getSubscriptionConnectionMinimumIdleSize(),
                config.getSubscriptionConnectionPoolSize(),
                connectionManager,
                NodeType.MASTER);
        //省略...
        CompletableFuture writeFuture = writeConnectionPool.add(masterEntry);
        //省略...
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }).whenComplete((r, e) -> {
        if (e != null) {
            client.shutdownAsync();
        }
    }).thenApply(r -> client);
}

创建连接条目,并添加到连接池中。继续看添加到连接池操作:

public CompletableFuture add(ClientConnectionsEntry entry) {
    CompletableFuture promise = initConnections(entry, true);
    return promise.thenAccept(r -> {
        entries.add(entry);
    });
}

这里做了初始化连接操作,然后添加到连接池的队列中,接着看一下initConnections初始化连接:

private CompletableFuture initConnections(ClientConnectionsEntry entry, boolean checkFreezed) {
    //省略...
    int startAmount = Math.min(5, minimumIdleSize);
    AtomicInteger requests = new AtomicInteger(startAmount);
    for (int i = 0; i < startAmount; i++) {
        createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
    }
    return initPromise;
}

省略掉中间一些调用链,最终会调用到RedisClient的connectAsync方法:

public RFuture connectAsync() {
    CompletableFuture addrFuture = resolveAddr();
    CompletableFuture f = addrFuture.thenCompose(res -> {
        CompletableFuture r = new CompletableFuture();
        ChannelFuture channelFuture = bootstrap.connect(res);
        channelFuture.addListener(new ChannelFutureListener() {
        	//省略...
        });
        return r;
    });
    return new CompletableFutureWrapper(f);
}

此处就用使用前面创建的Bootstrap进行连接操作,当然这里是初始化连接到连接池,如果并发比较大,连接池中初始连接数不够用,会在发起请求的时候创建新的连接。

2.加锁

加锁会先调用RedissonClient�创建锁对象。

public RLock getLock(String name) {
    return new RedissonLock(commandExecutor, name);
}

然后创建RedissonLock:

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

RedissonLock.png
调用父类构造函数,指定执行器、锁释放时间以及发布订阅组件。
然后继续看加锁逻辑,这里加锁我们使用tryLock并指定了等待时间、释放时间:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 1.尝试获取锁,如果ttl返回null,代表加锁成功
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return true;
    }
    time -= System.currentTimeMillis() - current;
    // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
    if (time  {
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    try {
        time -= System.currentTimeMillis() - current;
        // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
        if (time  {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper(f);
}

如果传入锁释放时间且大于零,使用用户传入的释放时间,否则使用默认的释放时间30秒,然后调用tryLockInnerAsync获取锁并返回中心化节点数据的ttl时间。
如果用户传入了leaseTime就不会开启看门狗机制实现自动续期,如果没有传入则开启看门口续期机制。
继续看tryLockInnerAsync方法实现:

 RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

为了保证操作的原子性,这里使用了lua脚本来操作redis,执行脚本时key是加锁的名称,ARGV分别是释放时间和线程信息。从脚本内容可以看出,锁在redis中的数据结构是hash,外层key存储的是锁的名称,内部field和value存储的是加锁客户端线程信息。脚本含义是:

  • 如果hash不存在,则直接放入加锁客户端信息并设置失效时间返回
  • 如果hash中存在加锁客户端的信息,则value加1实现重入逻辑,并设置过期时间返回
  • 否则竞争加锁失败,返回锁对应hash的过期时间

然后调用evalWriteAsync�执行lua脚本:

protected  RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
int availableSlaves = entry.getAvailableSlaves();
CommandBatchService executorService = createCommandBatchService(availableSlaves);
RFuture result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (commandExecutor instanceof CommandBatchService) {
return result;
}
RFuture

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论