Zookeeper核心概念及Java客户端的使用

Zookeeper核心概念及Java客户端的使用

一、Zookeeper核心概念

1.什么是znode?

image

ZooKeeper操作和维护的为一个个数据节点,称为 znode,采用类似文件系统的层级树状结构进行管理。如果 znode 节点包含数据则存储为字节数组(byte array)。
创建 znode 时需要指定节点类型
znode 共有 4 种类型,分别为:持久(无序)、临时(无序)、持久有序和临时有序。

2.节点的类型

2大类、四种类型 持久、临时、持久有序、临时有序
PERSISTENT 持久类型,如果不手动删除 是一直存在的
PERSISTENT_SEQUENTIAL
EPHEMERAL 临时 客户端session失效就会随着删除节点 没有子节点
EPHEMERAL_SEQUENTIAL 有序 自增

Stat数据结构:

Stat中记录了这个 ZNode 的三个数据版本,分别是version(当前ZNode的版本)、cversion(当前ZNode子节点的版本)和 cversion(当前ZNode的ACL版本)。
Stat:状态信息、版本、权限相关

image

Session会话:
客户端来创建一个和zk服务端连接的句柄。
连接状态:CONNECTING\CONNECTED\CLOSED

3.什么是watcher?

Watcher(事件监听器),是Zookeeper中的一个很重要的特性。Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要特性。

image

4.什么是ACL?

ACL(Access Control List)
内置的 ACL schemes:
world:默认方式,相当于全世界都能访问
auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
ip:使用Ip地址认证

ACL支持权限:
CREATE: 能创建子节点
READ:能获取节点数据和列出其子节点
WRITE: 能设置节点数据
DELETE: 能删除子节点
ADMIN: 能设置权限
高性能
ZooKeeper 是高性能的。 在“读”多于“写”的应用程序中尤其地高性能,因为“写”会导致所有的服务器间同步状态。(“读”多于“写”是协调服务的典型场景。)
顺序访问
对于来自客户端的每个更新请求,ZooKeeper 都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序,应用程序可以使用 ZooKeeper 这个特性来实现更高层次的同步原语。 这个编号也叫做时间戳——zxid(Zookeeper Transaction Id)

二、Java客户端的使用

1.Zkclient

ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。
github源代码地址:https://github.com/sgroschupf/zkclient

  • 添加maven依赖
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
  • zkclient使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public ZkClient(String serverstring)
public ZkClient(String zkServers, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)
public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)
public ZkClient(IZkConnection connection)
public ZkClient(IZkConnection connection, int connectionTimeout)
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)
public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)
创建节点
public void createPersistent(String path)
public void createPersistent(String path, boolean createParents)
public void createPersistent(String path, boolean createParents, List<ACL> acl)
public void createPersistent(String path, Object data)
public void createPersistent(String path, Object data, List<ACL> acl)
public String createPersistentSequential(String path, Object data)
public String createPersistentSequential(String path, Object data, List<ACL> acl)
public void createEphemeral(final String path)
public void createEphemeral(final String path, final List<ACL> acl)
public String create(final String path, Object data, final CreateMode mode)
public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode)
public void createEphemeral(final String path, final Object data)
public void createEphemeral(final String path, final Object data, final List<ACL> acl)
public String createEphemeralSequential(final String path, final Object data)
public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl)
删除节点:
public boolean delete(final String path)
public boolean delete(final String path, final int version)
public boolean deleteRecursive(String path)
读取列表:
public List<String> getChildren(String path)
获取节点内容:
public <T extends Object> T readData(String path)
public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists)
public <T extends Object> T readData(String path, Stat stat)
更新内容:
public void writeData(String path, Object object)
public void writeData(final String path, Object datat, final int expectedVersion)
public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion)

监测节点是否存在
protected boolean exists(final String path, final boolean watch)

注册监听
接口类 注册监听方法 解除监听方法
IZkChildListener ZkClient的subscribeChildChanges方法 ZkClient的unsubscribeChildChanges方法
IZkDataListener ZkClient的subscribeDataChanges方法 ZkClient的subscribeChildChanges方法
IZkStateListener ZkClient的subscribeStateChanges方法 ZkClient的unsubscribeStateChanges方法
其中ZkClient还提供了一个unsubscribeAll方法,来解除所有监听。
2.Curator

Curator 是 Netflix 公司开源的一个 Zookeeper 客户端,与 Zookeeper 提供的原生客户端
相比,Curator 的抽象层次更高,简化了 Zookeeper 客户端的开发量。现在已是 apache 的
顶级开源框架,Fluent 编程风格的实现。
官网:http://curator.apache.org

  • 添加maven依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.curator</groupId> <!--基础框架-->
<artifactId>curator-framework</artifactId>
<version></version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId> <!--功能 jar 分布式锁、队列等-->
<artifactId>curator-recipes</artifactId>
<version></version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId> <!--客户端重试策略-->
<artifactId>curator-client</artifactId>
<version></version>
</dependency>
  • 使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Curator 框架提供了一种流式接口,通过 builder 串起来,传递参数都是调方法。
Curator 框架通过 CuratorFrameworkFactory 以工厂模式和 builder 模式创建
CuratorFramework 实 例。 CuratorFramework 实例都是线程安全的,你应该在你的应用中
共享同一个.
工厂方法 newClient()提供了一个简单方式创建实例。 而 Builder 提供了更多的参数控制。
一旦你创建了一个 CuratorFramework 实例,你必须调用它的 start()启动,在应用退出时调
用 close()方法关闭.
1.创建 Curator 连接实例
String address = "localhost:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(address, new
ExponentialBackoffRetry(1000, 3));//重试机制
client.start();

注意:一个 Zookeeper 集群只需要构造一个 CuratorFramework 实例对象即可。
CuratorFramework 使用之前必须先调用client.start();

CuratorFramework 提供的方法:
方法名 描述
create 开始创建操作, 可以调用额外的方法(比如方式 mode 或者后台执行 background) 并在最后调用 forPath()指定要操作的 ZNode
Delete 开始删除操作. 可以调用额外的方法(版本或者后台处理 version orbackground)并在最后调用 forPath()指定要操作的 ZNode
checkExists 开始检查 ZNode 是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用 forPath()指定要操作的 ZNode
getData 开始获得 ZNode 节点数据的操作. 可以调用额外的方法(监控、后台处理或者获取状态 watch, background or get stat) 并在最后调用 forPath()指定要操作的 ZNode
setData 开始设置 ZNode 节点数据的操作. 可以调用额外的方法(版本或者后台处理) 并在最后调用 forPath()指定要操作的 ZNode
getChildren 开始获得 ZNode 的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态 watch, background or get stat) 并在最后调用 forPath()指定要操作的 ZNode
inTransaction 开始是原子 ZooKeeper 事务. 可以复合 create, setData, check, and/or
delete 等操作然后调用 commit()作为一个原子操作提交

事件类型以及事件的方法如下:

EventType EventMethods
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GETDATA getResultCode(), getPath(), getStat() andgetData()
SETDATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(),getChildren()
WATCHED getWatchedEvent

监听器

Curator 提供了三种 Watcher(Cache)来监听结点的变化:
Path Cache:监视一个路径下子结点的创建、删除,以及结点数据的更新。产生的
事件会传递给注册的 PathChildrenCacheListener。
Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
Tree Cache:Path Cache 和 Node Cache 的“合体”,监视路径下的创建、更新、删
除事件,并缓存路径下所有子结点的数据。

重试机制
Curator 内部实现的几种重试策略:
1.ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
2.RetryNTimes:指定最大重试次数的重试策略
3.RetryOneTime:仅重试一次
4.RetryUntilElapsed:一直重试直到达到规定的时间

代码示例:https://github.com/cnsyear/tuling-zookeeper.git

-------------已经触及底线 感谢您的阅读-------------
0%