一、基于redisson实现分布式锁使用
Redisson是一个使用Java编写的开源库,它提供了对Redis数据库的访问和操作的封装,并在此基础上提供了各种分布式功能,包括分布式锁。
Redisson的分布式锁是基于Redis的原子性操作来实现的,它提供了简单且易于使用的API,可以在分布式环境中实现高效的分布式锁管理。
1.引入依赖
引入redis和redisson相关依赖:
org.springframework.boot
spring-boot-starter-data-redis
org.redisson
redisson-spring-boot-starter
2.编写配置
编写声明RedissonClient配置,server类型可以选ClusterServers,MasterSlaveServers,ReplicatedServers,SentinelServers和SingleServer,此处使用的server类型选单体redis:
@Configuration
public class RedissonConfiguration {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password:}")
private String password;
private int database = 0;
@Bean
public RedissonClient redisson() {
String address = "redis://" + host + ":" + port;
Config config = new Config();
config.useSingleServer()
.setAddress(address)
.setPassword(password)
.setDatabase(database);
return Redisson.create(config);
}
}
3.使用分布式锁
注入RedissonClient然后获取锁,加锁后进行独占业务操作,最后释放锁。
@Service
@Slf4j
public class TestRLock {
@Resource
private RedissonClient redissonClient;
public void doSomething(String orderId) {
RLock lock = redissonClient.getLock("place_order:" + orderId);
try {
if(lock.tryLock(5,10, TimeUnit.SECONDS)) {
this.doBuzzExclusive(orderId);
}
} catch (Exception e) {
log.error("occur error;orderId={}",orderId,e);
} finally {
//锁被持有,并且被当前线程持有
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 模拟独占处理
* @param orderId
*/
private void doBuzzExclusive(String orderId) {
// TODO: do business exclusive
}
}
这样就实现分布式锁对竞态资源的操作控制。
二、redisson分布式锁原理
1.建立连接
在Redisson中,Netty被用作底层的网络通信框架。它提供了高性能、异步非阻塞的网络通信能力,使得Redisson可以与Redis服务器进行快速、可靠的通信。
在使用Redisson创建RedissonClient实例时,它会自动初始化并启动Netty客户端,用于与Redis服务器建立连接。
从前边的分布式锁使用过程可以看出,RLock是由RedissonClient创建,那么与redis的连接交互也是由RedissonClient来实现,我们从创建RedissonClient过程看一下redisson如何与redis建立连接的。
public static RedissonClient create(Config config) {
return new Redisson(config);
}
然后调用Redisson构造函数创建:
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
}
这里会复制一份配置出来,然后创建连接管理器、命令执行器、定期定出调度、以及异步写服务。
此处主要关注命令执行器和连接管理器,此处用的是同步命令执行器,当然也有其他实现比如CommandBatchService批量执行器。
然后再看下创建连接管理器:
public static ConnectionManager createConnectionManager(Config configCopy) {
UUID id = UUID.randomUUID();
if (configCopy.getMasterSlaveServersConfig() != null) {
validate(configCopy.getMasterSlaveServersConfig());
return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
} else if (configCopy.getSingleServerConfig() != null) {
validate(configCopy.getSingleServerConfig());
return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
} else if (configCopy.getSentinelServersConfig() != null) {
validate(configCopy.getSentinelServersConfig());
return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
} else if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig());
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
} else if (configCopy.getReplicatedServersConfig() != null) {
validate(configCopy.getReplicatedServersConfig());
return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
} else if (configCopy.getConnectionManager() != null) {
return configCopy.getConnectionManager();
}else {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
}
前边使用的是SingleServer,看一下SingleConnectionManager创建流程:
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
super(create(cfg), config, id);
}
SingleConnectionManager继承了MasterSlaveConnectionManager,会调用父类构造器:
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);
this.config = cfg;
if (cfg.getSlaveAddresses().isEmpty()
&& (cfg.getReadMode() == ReadMode.SLAVE || cfg.getReadMode() == ReadMode.MASTER_SLAVE)) {
throw new IllegalArgumentException("Slaves aren't defined. readMode can't be SLAVE or MASTER_SLAVE");
}
initTimer(cfg);
initSingleEntry();
}
initTimer会创建空闲连接监听管理以及发布订阅管理器,然后调用initSingleEntry初始化单机客户端。
protected void initSingleEntry() {
try {
if (config.checkSkipSlavesInit()) {
masterSlaveEntry = new SingleEntry(this, config);
} else {
masterSlaveEntry = new MasterSlaveEntry(this, config);
}
CompletableFuture masterFuture = masterSlaveEntry.setupMasterEntry(new RedisURI(config.getMasterAddress()));
masterFuture.join();
//省略...
startDNSMonitoring(masterFuture.getNow(null));
} catch (Exception e) {
//省略...
}
}
创建SingleEntry,然后调用setupMasterEntry方法设置主节点连接,并且会调用startDNSMonitoring方法开启线程监听ip是否发生变成,如果变成会重新连接。继续看setupMasterEntry方法:
public CompletableFuture setupMasterEntry(RedisURI address, String sslHostname) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
return setupMasterEntry(client);
}
此处会先创建redis客户端,然后调用setupMasterEntry方法设置主节点连接;先看一下创建RedisClient:
private RedisClient(RedisClientConfig config) {
RedisClientConfig copy = new RedisClientConfig(config);
//省略...
channels = new DefaultChannelGroup(copy.getGroup().next());
bootstrap = createBootstrap(copy, Type.PLAIN);
pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
//省略...
}
这里基本上都是netty启动器的相关设置和前置准备,可以看一下创建netty客户端启动器的操作:
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap()
.resolver(config.getResolverGroup())
.channel(config.getSocketChannelClass())
.group(config.getGroup());
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
config.getNettyHook().afterBoostrapInitialization(bootstrap);
return bootstrap;
}
然后继续看setupMasterEntry做了什么事情.
private CompletableFuture setupMasterEntry(RedisClient client) {
CompletableFuture addrFuture = client.resolveAddr();
return addrFuture.thenCompose(res -> {
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
//省略...
CompletableFuture writeFuture = writeConnectionPool.add(masterEntry);
//省略...
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}).whenComplete((r, e) -> {
if (e != null) {
client.shutdownAsync();
}
}).thenApply(r -> client);
}
创建连接条目,并添加到连接池中。继续看添加到连接池操作:
public CompletableFuture add(ClientConnectionsEntry entry) {
CompletableFuture promise = initConnections(entry, true);
return promise.thenAccept(r -> {
entries.add(entry);
});
}
这里做了初始化连接操作,然后添加到连接池的队列中,接着看一下initConnections初始化连接:
private CompletableFuture initConnections(ClientConnectionsEntry entry, boolean checkFreezed) {
//省略...
int startAmount = Math.min(5, minimumIdleSize);
AtomicInteger requests = new AtomicInteger(startAmount);
for (int i = 0; i < startAmount; i++) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
return initPromise;
}
省略掉中间一些调用链,最终会调用到RedisClient的connectAsync方法:
public RFuture connectAsync() {
CompletableFuture addrFuture = resolveAddr();
CompletableFuture f = addrFuture.thenCompose(res -> {
CompletableFuture r = new CompletableFuture();
ChannelFuture channelFuture = bootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
//省略...
});
return r;
});
return new CompletableFutureWrapper(f);
}
此处就用使用前面创建的Bootstrap进行连接操作,当然这里是初始化连接到连接池,如果并发比较大,连接池中初始连接数不够用,会在发起请求的时候创建新的连接。
2.加锁
加锁会先调用RedissonClient�创建锁对象。
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name);
}
然后创建RedissonLock:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
调用父类构造函数,指定执行器、锁释放时间以及发布订阅组件。
然后继续看加锁逻辑,这里加锁我们使用tryLock并指定了等待时间、释放时间:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1.尝试获取锁,如果ttl返回null,代表加锁成功
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
if (time {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
// 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
if (time {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper(f);
}
如果传入锁释放时间且大于零,使用用户传入的释放时间,否则使用默认的释放时间30秒,然后调用tryLockInnerAsync获取锁并返回中心化节点数据的ttl时间。
如果用户传入了leaseTime就不会开启看门狗机制实现自动续期,如果没有传入则开启看门口续期机制。
继续看tryLockInnerAsync方法实现:
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
为了保证操作的原子性,这里使用了lua脚本来操作redis,执行脚本时key是加锁的名称,ARGV分别是释放时间和线程信息。从脚本内容可以看出,锁在redis中的数据结构是hash,外层key存储的是锁的名称,内部field和value存储的是加锁客户端线程信息。脚本含义是:
- 如果hash不存在,则直接放入加锁客户端信息并设置失效时间返回
- 如果hash中存在加锁客户端的信息,则value加1实现重入逻辑,并设置过期时间返回
- 否则竞争加锁失败,返回锁对应hash的过期时间
然后调用evalWriteAsync�执行lua脚本:
protected RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
int availableSlaves = entry.getAvailableSlaves();
CommandBatchService executorService = createCommandBatchService(availableSlaves);
RFuture result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (commandExecutor instanceof CommandBatchService) {
return result;
}
RFuture