【ZookeeperApach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0

2023年 7月 12日 69.7k 0

介绍

Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。和ZK的原生客户端相比,Curator的抽象层次要更高,同时简化了ZK的常用功能开发量,比如Curator自带连接重试、反复注册Watcher、NodeExistsException 异常处理等等。

根据官方的介绍,我们可以了解到它是一个用于分布式的Java客户端API工具。它基于high-level API,拥有它可以更简单易懂的指挥Zookeeper实现分布式安全应用程序开发。

Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes,以及广为熟知的 分布式锁。

Curator 当然也包括许多扩展,比如服务发现和基于Java 8异步DSL。

Apache Curator is a Java/JVM client library for [Apache ZooKeeper](https://zookeeper.apache.org/), a distributed coordination service.

Apache Curator includes a high-level API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.

用官方的介绍来说就是:guava之于java就像curator之于zookeeper

ZK 版本支持

Curator 目前最新的版本为 5.X 的版本,已经不支持 ZK 的 3.4.X 以及之前的版本,这里经过考虑最终选择了 ZK的 3.5.10 版本。

5.X 对于 Curator 做了不少破坏性的改动,不兼容的原因如下:

  • 旧的ListenerContainer类已经被移除,以避免Guava类泄漏。
  • ConnectionHandlingPolicy和相关类已被删除
  • Reaper和ChildReaper类/recipes已被删除。您应该改用 ZooKeeper 容器节点。
  • newPersistentEphemeralNode()和newPathChildrenCache()已从GroupMember中移除。
  • ServiceCacheBuilder executorService(CloseableExecutorService executorService)已从ServiceCacheBuilder中移除。
  • ServiceProviderBuilder executorService(CloseableExecutorService executorService)已从ServiceProviderBuilder中移除。
  • static boolean shouldRetry(int rc)已从RetryLoop中移除。
  • static boolean isRetryException(Throwable exception)已从RetryLoop中移除。

官网地址

Apache Curator

下载地址

Curator Maven 相关地址:mvnrepository.com/artifact/or…

Curator jar包下载地址:cwiki.apache.org/confluence/…

快速开始

ZK 集群部署

学习之前需要使用ZK搭建集群环境,方便Debug的时候调试代码。这部分搭建过程放到另一篇:

[【Zookeeper】基于3台linux虚拟机搭建zookeeper集群]

Maven依赖引入

下面是对应的Zookeeper和Curator的版本选择。

4.3.0  
3.5.10

		org.apache.curator
		curator-framework
		${curator.version}
		
			
				org.apache.zookeeper
				zookeeper
			
		
	

	
		org.apache.curator
		curator-recipes
		${curator.version}
		
			
				org.apache.zookeeper
				zookeeper
			
		
	

	
		org.apache.zookeeper
		zookeeper
		${zookeeper.version}
	

构建入门实例

Curator 最为核心和强大并且常用功能是分布式锁。在入门demo中可以看到整个 Curator 依靠 CuratorFrameworkFactory 构建,使用 Curator 进行分布式加锁解锁操作,只需要为所连接的每个ZooKeeper集群提供一个CuratorFramework对象。

CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)

上面的方法将会使用默认值创建与ZooKeeper集群的连接,唯一需要关注的是重试策略。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

根据参数值可以大致了解到,这里使用的策略是指数的方式递增间隔尝试重试时间,并且最终重试三次。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.0.1;192.168.0.2;192.168.0.3", retryPolicy);  
client.start();  
// 此处就获取到 zk的一个连接实例。  
//.....

拥有了 CuratorFramework 实例之后,就可以直接通过 API 调用操作ZK。下面我们看一下重点以及使用最多的分布式锁的操作部分:

client.create().forPath("/my/path", myData)

这样的直接调用还有个好处是对于ZK的操作client实例如果碰到网络抖动等情况会自动重试。

可重入锁(公平锁)案例代码

下面是官网可重入锁的Demo使用代码。

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

这里改造一下即可简单使用。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);  
client.start();  
// 此处就获取到 zk的一个连接实例。  
//.....  
client.create().forPath("/my/path", "Test".getBytes());  
InterProcessMutex lock = new InterProcessMutex(client, "/test/myLock");  
lock.acquire();  
try {  
    // do some work inside of the critical section here  
    Thread.sleep(3000);  
} finally {  
    lock.release();  
}

初始化过程流程图

初始化过程流程图全图如下。下面将会一步步拆解这幅图是如何拼凑的。

Curator 源码分析.drawio.png

drawio 源文件和图片地址如下:
链接:pan.baidu.com/s/18PoMjkp1…
提取码:4bug

初始化源码分析

直奔源码分析部分,本文主要介绍和Curator初始化、内部的通知机制以及会话管理部分。

CuratorFramework 初始化过程

初始化过程流程图

CuratorFramework 初始化过程下面截图这一部分,红色部分为个人认为相对比较重要的对象和变量。

image.png

CuratorFrameworkFactory.newClient() 代码分析

Curator 当中默认使用公平锁的策略去获取锁,多个客户端会按照排队的顺序挨个获取锁,下面我们通过代码进行验证。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy);

在获取分布式锁之前我们需要先连接ZK集群,整个过程通过两行代码完成,首先需要确定连接ZK的重试策略,接着通过CuratorFrameworkFactory构建Curator 实例即可,Curator 内部根据ZK原生客户端做了一层封装,开发者使用过程中不需要关注。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);

上面是简单的模板代码。ExponentialBackoffRetry 构建重试策略为按照指数增长重试时间,比如第一次1秒,第二次2秒,第三次4秒,第四次8秒.....

接着是利用CuratorFrameworkFactory构建实例。

return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);

这里强调一下两个常量 DEFAULT_SESSION_TIMEOUT_MS (默认的会话超时时间)、DEFAULT_CONNECTION_TIMEOUT_MS(默认的连接超时时间),作用是传入指定的重试策略默认参数。

private static final int DEFAULT_SESSION_TIMEOUT_MS
    = Integer.getInteger("curator-default-session-timeout", 60 * 1000)
private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);

我们进一步进入构造方法,这里用了建造者模式。

return builder().  
    connectString(connectString).  
    sessionTimeoutMs(sessionTimeoutMs).  
    connectionTimeoutMs(connectionTimeoutMs).  
    retryPolicy(retryPolicy).  
    build();

实际上调用的是CuratorFrameworkImpl实例。这里把CuratorFrameworkFactory的this引用逸出给CuratorFrameworkImpl对象。

return new CuratorFrameworkImpl(this);

CuratorFrameworkImpl 构造方法的内容比较多,这里主要说一下CuratorZookeeperClient这个对象,相当于ZK原生客户端的封装对象。

其他组件内容和Curator 的各种通知管理和会话管理等等功能有关。


public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
//用于判断连接断开和连接超时的状态,设置curator的连接状态,并通过connectionStateManager触发连接事件状态通知
internalConnectionHandler = new StandardInternalConnectionHandler();

//接收事件的通知。后台线程操作事件和连接状态事件会触发
listeners = new ListenerContainer();

//当后台线程发生异常或者handler发生异常的时候会触发
unhandledErrorListeners = new ListenerContainer();
//后台线程执行的操作队列
backgroundOperations = new DelayQueue

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论