Zookeeper是一个分布式协调服务,可以用来实现分布式锁的功能。
分布式锁是一种控制多个分布式系统之间同步访问共享资源的机制。
Zookeeper实现分布式锁的原理如下:
首先,需要在 Zookeeper 中创建一个持久节点作为锁的根节点,例如 /lock。
然后,每个需要获取锁的客户端都在 /lock 节点下创建一个临时顺序节点,例如 /lock/seq-0000000001。这样可以利用 Zookeeper 的节点唯一性和顺序性特性。
接着,每个客户端都获取 /lock 节点下的所有子节点,并按照序号排序,判断自己创建的节点是否是最小的。如果是,说明获取到了锁,可以执行相关操作。
如果不是最小的,说明没有获取到锁,需要等待。此时,客户端可以监听自己前一个节点的变化(例如删除),一旦监听到事件发生,就重新判断自己是否是最小的节点。
最后,当客户端执行完操作后,需要释放锁,即删除自己创建的临时顺序节点。这样,后面等待的客户端就可以收到通知,继续尝试获取锁。
以上就是 Zookeeper 实现分布式锁的基本原理。
实现方式:
实际开发过程中,可以 curator 工具包封装的API帮助我们实现分布式锁。
org.apache.curator
curator-recipes
1、客户端想要获取锁,就在 Zookeeper 上创建一个临时的、有序的节点,这个节点相当于一把锁。
2、客户端查看 Zookeeper 上所有的节点,按照顺序排列,看看自己创建的节点是不是最小的。
3、判断是否获得锁,如果是读操作,只要自己前面没有写操作的节点,就可以获取锁,然后开始执行读逻辑。如果是写操作,只有自己是最小的节点,才可以获取锁,然后开始执行写逻辑。
4、如果没有获取到锁,就要等待。如果是读操作,就监听自己前面最近的一个写操作的节点。如果是写操作,就监听自己前面最近的一个节点。一旦监听到这个节点被删除了,就重新判断是否可以获取锁。
Curator 的几种锁方案 :
1、InterProcessMutex:分布式可重入排它锁2、InterProcessSemaphoreMutex:分布式排它锁3、InterProcessReadWriteLock:分布式读写锁下面例子模拟 50 个线程使用重入排它锁 InterProcessMutex 同时争抢锁:
实例:
public class InterprocessLock {
public static void main(String[] args) {
CuratorFramework zkClient = getZkClient();
String lockPath = "/lock";
//通过InterProcessMutex创建分布式锁 InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
//模拟50个线程抢锁
for (int i = 0; i < 50; i++) {
new Thread(new TestThread(i, lock)).start();
}
}
static class TestThread implements Runnable {
private Integer threadFlag;
private InterProcessMutex lock;
public TestThread(Integer threadFlag, InterProcessMutex lock) {
this.threadFlag = threadFlag;
this.lock = lock;
}
@Override
public void run() {
try {
lock.acquire();
System.out.println("第"+threadFlag+"线程获取到了锁");
//等到1秒后释放锁
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2181";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}