0%

zookeeper的API

zookeeper的API

首先引入zookeeper依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.1</version>
</dependency>

创建zookeeper连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 连接地址
private final String connect = "localhost:2181";

// 单位是毫秒
private final int sessionTimeOut = 2000;

private ZooKeeper zkClient;

@Before
public void init() throws IOException {
// connectString 连接地址,如果是集群的话则使用逗号拼接 如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"

// sessionTimeout session过期时间,单位毫秒
// watcher 监听事件
zkClient = new ZooKeeper(connect, sessionTimeOut, watchedEvent -> {

});
}

创建节点

1
2
3
4
5
6
7
8
9
10
11
12
// 创建节点
@Test
public void createNode() throws KeeperException, InterruptedException {

// path the path for the node 路径
// data the initial data for the node 数据
// acl the acl for the node 访问权限控制列表
// createMode specifying whether the node to be created is ephemeral
// * and/or sequential 创建模式,是临时节点/持久化节点 或者是否有序,如果是临时节点,该节点不允许有孩子节点
String path = zkClient.create("/zk_client_test","第一次创建节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}

获取节点数据

1
2
3
4
5
6
7
8
9
10
11
12
// 获取节点数据
@Test
public void getNodeData() throws KeeperException, InterruptedException {

Stat stat = new Stat();
//path the given path 路径
//watch whether need to watch this node 是否监听该节点
//stat the stat of the node 获取状态信息
byte[] data = zkClient.getData("/zk_client_test",false,stat);
System.out.println(new String(data));
System.out.println(stat);
}

设置节点数据

1
2
3
4
5
6
7
8
9
// 设置节点数据
@Test
public void setNodeData() throws KeeperException, InterruptedException {
// path 路径
// data the data to set 数据
// version 所期待的版本,相当于乐观锁,如果是-1,则匹配所有版本
Stat stat = zkClient.setData("/zk_client_test","修改节点数据".getBytes(), 0);
System.out.println(stat);
}

获取子节点

1
2
3
4
5
6
7
8
9
// 获取子节点
@Test
public void getChildNodes() throws KeeperException, InterruptedException {
// 第二个参数为是否监听节点变化
List<String> nodes = zkClient.getChildren("/zookeeper",false);
for(String node : nodes){
System.out.println(node);
}
}

删除节点

1
2
3
4
5
6
7
// 删除节点
@Test
public void deleteNode() throws KeeperException, InterruptedException {
// path 路径
// version 所期待的版本,相当于乐观锁,如果是-1,则匹配所有版本
zkClient.delete("/zk_client_test",-1);
}

其他客户端

zookeeper原生的client并不好用,除此之外还有ZkClient和Curator两个开源的客户端产品

ZkClient

ZkClient是在原生的zookeeper API接口上进行了包装,实现了诸如session超时重连、Watcher反复注册等功能

1
2
3
4
5
6
<!-- zkClient 客户端 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
创建zookeeper连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 连接地址
private final String connect = "localhost:2181";

// 单位是毫秒
private final int sessionTimeOut = 2000;
private final int connectionTimeout = 2000;

private ZkClient zkClient;

@Before
public void init() {
// zkServers 连接地址,如果是集群的话则使用逗号拼接 如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
// sessionTimeOut session过期时间,单位毫秒
// connectionTimeout 连接创建超时时间,单位毫秒
zkClient = new ZkClient(connect, sessionTimeOut,connectionTimeout,new StringZkSerializer());
}

这里自定义了一个序列化器

1
2
3
4
5
6
7
8
9
10
11
12
static class StringZkSerializer implements ZkSerializer {

@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
return String.valueOf(data).getBytes(StandardCharsets.UTF_8);
}

@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes,StandardCharsets.UTF_8);
}
}
创建节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void createNode() {

// path 路径
// data 数据

// 创建模式,持久节点
zkClient.createPersistent("/zkclient_test","第一次创建节点");


// createParents为true 递归创建父节点
zkClient.createPersistent("/p/c",true);


}
设置节点数据
1
2
3
4
5
6
7
public void setNodeData() {
// path 路径
// data the data to set 数据
// version 所期待的版本,相当于乐观锁,如果是-1,则匹配所有版本
zkClient.writeData("/zkclient_test","修改节点数据",-1);

}
获取节点数据
1
2
3
4
5
6
7
8
9
public void getNodeData() {

Stat stat = new Stat();
//path the given path 路径
//stat the stat of the node 获取状态信息
String data = zkClient.readData("/zkclient_test",stat);
System.out.println(data);
System.out.println(stat);
}
删除节点
1
2
3
4
public void deleteNode() {
// 使用deleteRecursive可以逐层删除节点
zkClient.deleteRecursive("/p/c");
}
监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void getChildNodes() throws InterruptedException {
// 第二个参数为监听器
// zkClient的监听器不是一次性的,会一直有效
zkClient.subscribeChildChanges("/test_watch", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath+"中子节点发生变化,当前子节点列表为"+currentChilds);
}
});

// 监听打印内容 /test_watch中子节点发生变化,当前子节点列表为[]
zkClient.createPersistent("/test_watch");
Thread.sleep(1000);
System.out.println(zkClient.getChildren("/test_watch"));
// 监听打印内容 /test_watch中子节点发生变化,当前子节点列表为[c1]
zkClient.createPersistent("/test_watch/c1");
Thread.sleep(1000);
// 监听打印内容 /test_watch中子节点发生变化,当前子节点列表为[]
zkClient.delete("/test_watch/c1");
Thread.sleep(1000);
// 监听打印内容 /test_watch中子节点发生变化,当前子节点列表为null
zkClient.delete("/test_watch");
Thread.sleep(3000);


}

Curator

Curator解决了很多Zookeeper客户端底层的细节开发工作,实现了连接重连、Watcher反复注册和NotExistsException等功能,是使用最广泛的zookeeper客户端之一

创建zookeeper连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 连接地址
private final String connect = "localhost:2181";

// 单位是毫秒
private final int sessionTimeOut = 2000;
private final int connectionTimeout = 2000;

private CuratorFramework zkClient;

@Before
public void init() {
// baseSleepTimeMs 初始sleep时间
// maxRetries 最大重试次数

// ExponentialBackoffRetry重试策略是,给定一个初始sleep时间,在这个基础上结合重试次数,计算当前需要sleep的时间
// 当前sleep时间 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
// connectString 连接地址,如果是集群的话则使用逗号拼接 如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
// sessionTimeOut session过期时间,单位毫秒
// connectionTimeout 连接创建超时时间,单位毫秒
zkClient = CuratorFrameworkFactory
.builder()
.connectString(connect)
.sessionTimeoutMs(sessionTimeOut)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retry)
.namespace("base") // 用于进行命名空间隔离,也就是根目录为设置的命名空间名称
.build();
// 只有start之后才会执行客户端请求
zkClient.start();
}
创建节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void createNode() throws Exception {

// path 路径
// data 数据

// 创建模式,默认是持久节点
zkClient.create()
.withMode(CreateMode.PERSISTENT) // 节点类型
.forPath("/curator_test","第一次创建节点".getBytes());

// 递归创建
zkClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) // 节点类型
.forPath("/cup/ch","第一次创建节点".getBytes());

}
设置节点数据
1
2
3
4
5
6
7
8
9
public void setNodeData() throws Exception {
// path 路径
// data the data to set 数据
// version 所期待的版本,相当于乐观锁,如果是-1,则匹配所有版本
zkClient.setData()
.withVersion(-1)
.forPath("/curator_test","修改节点数据".getBytes());

}
获取节点数据
1
2
3
4
5
6
7
8
9
10
11
public void getNodeData() throws Exception {

Stat stat = new Stat();
//path the given path 路径
//stat the stat of the node 获取状态信息
byte[] data = zkClient.getData()
.storingStatIn(stat) // 获取stat
.forPath("/curator_test");
System.out.println(new String(data));
System.out.println(stat);
}
删除节点
1
2
3
4
5
6
7
8
9
public void deleteNode() throws Exception {
zkClient.delete()
.forPath("/curator_test");

// 使用deletingChildrenIfNeeded可以逐层删除节点
zkClient.delete()
.deletingChildrenIfNeeded()
.forPath("/cup");
}
监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public void getChildNodes() throws Exception {

CuratorCache cache = CuratorCache.build(zkClient, "/test_watch", CuratorCache.Options.SINGLE_NODE_CACHE);

CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreatesAndChanges(
// 添加或修改缓存中的数据时调用
new CuratorCacheListenerBuilder.ChangeListener() {
@Override
public void event(ChildData oldNode, ChildData node) {
System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",
oldNode, node);
}
})
.forTreeCache(zkClient, new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData eventData = event.getData();
if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){
System.out.printf("[forTreeCache] : NODE_ADDED path: [%s] data:[%s]\n", eventData.getPath(),
new String(eventData.getData())
);

} else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){
System.out.printf("[forTreeCache] : NODE_UPDATED path: [%s] data:[%s]\n", eventData.getPath(),
new String(eventData.getData())
);
} else if(event.getType() == TreeCacheEvent.Type.NODE_REMOVED){
System.out.printf("[forTreeCache] : NODE_REMOVED path: [%s] \n", eventData.getPath()
);
}
}

// @Override
// public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// ChildData eventData = event.getData();
// if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED){
// System.out.printf("[forPathChildrenCache] : CHILD_ADDED path: [%s] data:[%s]\n", eventData.getPath(),
// new String(eventData.getData())
// );
//
// } else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){
// System.out.printf("[forPathChildrenCache] : CHILD_UPDATED path: [%s] data:[%s]\n", eventData.getPath(),
// new String(eventData.getData())
// );
// } else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED){
// System.out.printf("[forPathChildrenCache] : CHILD_REMOVED path: [%s] \n", eventData.getPath()
// );
// } else {
// System.out.println(event);
// }
// }
})
.forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))
.build();
// 给CuratorCache实例添加监听器
cache.listenable().addListener(listener);
// 启动
cache.start();


// 监听打印内容 [forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/test_watch', stat=975,975,1690535494382,1690535494382,0,0,0,0,9,0,975
//, data=[49, 50, 55, 46, 48, 46, 48, 46, 49]}]
zkClient.create().forPath("/test_watch");
Thread.sleep(1000);
// 监听打印内容 [forTreeCache] : NODE_ADDED path: [/test_watch] data:[127.0.0.1]
// [forCreatesAndChanges] : Node changed: Old: [ChildData{path='/test_watch', stat=975,975,1690535494382,1690535494382,0,0,0,0,9,0,975
//, data=[49, 50, 55, 46, 48, 46, 48, 46, 49]}] New: [ChildData{path='/test_watch', stat=975,977,1690535494382,1690535496400,1,1,0,0,2,1,976
//, data=[49, 49]}]
zkClient.create().creatingParentsIfNeeded().forPath("/test_watch/c1");
Thread.sleep(1000);
//监听打印内容 [forTreeCache] : NODE_UPDATED path: [/test_watch] data:[11]
zkClient.setData().forPath("/test_watch","11".getBytes());
Thread.sleep(1000);
// 监听打印内容 [forTreeCache] : NODE_REMOVED path: [/test_watch]
zkClient.delete().forPath("/test_watch/c1");
Thread.sleep(1000);
// 监听打印内容 [forDeletes] : Node deleted: data: [ChildData{path='/test_watch', stat=975,977,1690535494382,1690535496400,1,1,0,0,2,1,976
//, data=[49, 49]}]
zkClient.delete().forPath("/test_watch");
Thread.sleep(3000);


}

欢迎关注我的其它发布渠道