avatar

目录
ZooKeeper专题(5):Apache Curator

代码地址:note_zookeeper

第7章 Apache Curator客户端的使用

7-1 curator简介与客户端之间的异同点

常用的zk java客户端

  • zk原生api
  • zkclient
  • Apache curator(常用)

原生api的不足

  • 超时重连,不支持自动,需要手动操作
  • Watch注册一次后会失效
  • 不支持递归创建节点

Apache curator

  • Apache的开源项目
  • 解决watch的注册一次就失效
  • Api更加简单易用
  • 提供更多解决方案且实现简单:比如 分布式锁
  • 提供常用的Zookeeper工具类
  • 编程风格好

7-2 curator基本操作

1、会话连接与关闭(RetryPolicy:重试机制)

2、节点的增删改查(namespace)

3、watcher监听(usingWatcher、nodeCache、PathChildrenCache)

pom

xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>

示例代码

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package machine.example.zookeeper.curator;

import java.util.List;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class CuratorOperator {

public CuratorFramework client = null;
public static final String zkServerPath = "10.211.55.6:2181";

/**
* 实例化zk客户端
*/
public CuratorOperator() {

// RetryPolicy:重试机制
/**
* 1、
* 同步创建zk示例,原生api是异步的
*
* curator链接zookeeper的策略:ExponentialBackoffRetry
* baseSleepTimeMs:初始sleep的时间
* maxRetries:最大重试次数
* maxSleepMs:最大重试时间
*/
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

/**
* 2、
* 推荐使用
*
* curator链接zookeeper的策略:RetryNTimes
* n:重试的次数
* sleepMsBetweenRetries:每次重试间隔的时间
*/
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

/**
* 3、
* curator链接zookeeper的策略:RetryOneTime
* sleepMsBetweenRetry:每次重试间隔的时间
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);

/**
* 4、
* 永远重试,不推荐使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)

/**
* 5、
* curator链接zookeeper的策略:RetryUntilElapsed
* maxElapsedTimeMs:最大重试时间
* sleepMsBetweenRetries:每次重试间隔
* 重试时间超过maxElapsedTimeMs后,就不再重试
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

//编程风格良好
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace")
.build();
client.start();
}

/**
*
* @Description: 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}

public static void main(String[] args) throws Exception {

//1、会话连接与关闭(RetryPolicy:重试机制)

CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));


//2、节点的增删改查(namespace)

// 创建节点
String nodePath = "/super/mxx"; // /workspace/super/mxx
/*
byte[] data = "superme".getBytes();
cto.client.create().creatingParentsIfNeeded() //递归创建节点
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
*/

// 更新节点数据
/*
byte[] newData = "batman".getBytes();
cto.client.setData().withVersion(0).forPath(nodePath, newData);
*/

// 删除节点
/*
cto.client.delete()
.guaranteed() // 可选,如果删除失败,那么在后端还是继续会删除,直到成功(网络抖动,操作在服务器成功,但在返回客户端时连接中断,response会顺利返回客户端)
.deletingChildrenIfNeeded() // 可选,如果有子节点,就删除(若路径为/super/mxx,则删除的是mxx及mxx的子节点)
.withVersion(0)
.forPath(nodePath);
*/

// 读取节点数据
/*
Stat stat = new Stat();
byte[] data = cto.client.getData()
.storingStatIn(stat) //可选,会填充状态信息到stat
.forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为: " + new String(data));
System.out.println("该节点的版本号为: " + stat.getVersion());
*/


// 查询子节点
/*
List<String> childNodes = cto.client.getChildren()
.forPath(nodePath);
System.out.println("开始打印子节点:");
for (String s : childNodes) {
System.out.println(s);
}
*/
// 判断节点是否存在,如果不存在则为空,存在则返回Stat
/*
Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
System.out.println(statExist);
*/



//3、watcher监听

/**
* 3.1.1
* 单次触发(usingWatcher)
* 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
* MyCuratorWatcher:实现Curator提供的api
* MyWatcher:实现原生api
*/
/*
cto.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
// cto.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
*/


/**
* 3.1.2
* 重复触发(NodeCache)
*
*/
/*
final NodeCache nodeCache = new NodeCache(cto.client, nodePath);

// buildInitial : 初始化的时候获取node的值并且缓存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("节点初始化数据为空...");
}
//节点修改会触发Listener,删除操作会触发 "空指针异常"
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() == null) {
System.out.println("空");
return;
}
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
}
});
*/

/**
* 3.2
* 为子节点添加watcher
*
* NodeCache只能监听到数据变动,不能监听具体的增删改操作
* 所以使用PathChildrenCache
* PathChildrenCache: 监听数据节点的增删改,会触发事件
*/
String childNodePathCache = nodePath;
// cacheData: 设置缓存节点的数据状态
final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
* NORMAL:异步初始化
* BUILD_INITIAL_CACHE:同步初始化
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

//打印子节点
/*
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点数据列表:");
for (ChildData cd : childDataList) {
String childData = new String(cd.getData());
System.out.println(childData);
}
*/

//异步初始化回调
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
//INITIALIZED:子节点初始化事件
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子节点初始化ok...");
}

else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
if (path.equals(ADD_PATH)) {
System.out.println("添加子节点:" + event.getData().getPath());
System.out.println("子节点数据:" + new String(event.getData().getData()));
} else if (path.equals("/super/imooc/e")) {
System.out.println("添加不正确...");
}

}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点路径:" + event.getData().getPath());
System.out.println("修改子节点数据:" + new String(event.getData().getData()));
}
}
});



//关闭
// Thread.sleep(3000);
Thread.sleep(100000);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));

}

public final static String ADD_PATH = "/super/mxx/d";

}

7-3 zk-watcher实例:统一更新N台节点的配置文件

![屏幕快照 2018-11-22 下午4.48.43](20181120104828229/屏幕快照 2018-11-22 下午4.48.43.png)

思路:新的配置动作会组成json串,set到zk节点,client监听到节点数据变化后,就下载更新新的配置文件

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class Client1 {

public CuratorFramework client = null;
public static final String zkServerPath = "10.211.55.6:2181";

public Client1() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}

public void closeZKClient() {
if (client != null) {
this.client.close();
}
}

// public final static String CONFIG_NODE = "/super/imooc/redis-config";
public final static String CONFIG_NODE_PATH = "/super/imooc";
public final static String SUB_PATH = "/redis-config";
public static CountDownLatch countDown = new CountDownLatch(1);

public static void main(String[] args) throws Exception {
Client1 cto = new Client1();
System.out.println("client1 启动成功...");

final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

// 添加监听事件
//set /workspace/super/imooc/redis-config {"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// 监听节点变化
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
String configNodePath = event.getData().getPath();
if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);

// 读取节点数据
String jsonConfig = new String(event.getData().getData());
System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);

// 从json转换配置
RedisConfig redisConfig = null;
if (StringUtils.isNotBlank(jsonConfig)) {
redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
}

// 配置不为空则进行相应操作
if (redisConfig != null) {
String type = redisConfig.getType();
String url = redisConfig.getUrl();
String remark = redisConfig.getRemark();
// 判断事件
if (type.equals("add")) {
System.out.println("监听到新增的配置,准备下载...");
// ... 连接ftp服务器,根据url找到相应的配置
Thread.sleep(500);
System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功,已经添加到项目中");
// ... 拷贝文件到项目目录
} else if (type.equals("update")) {
System.out.println("监听到更新的配置,准备下载...");
// ... 连接ftp服务器,根据url找到相应的配置
Thread.sleep(500);
System.out.println("开始下载配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功...");
System.out.println("删除项目中原配置文件...");
Thread.sleep(100);
// ... 删除原文件
System.out.println("拷贝配置文件到项目目录...");
// ... 拷贝文件到项目目录
} else if (type.equals("delete")) {
System.out.println("监听到需要删除配置");
System.out.println("删除项目中原配置文件...");
}

// TODO 视情况统一重启服务
}
}
}
}
});

countDown.await();

cto.closeZKClient();
}

}

7-4 acl权限

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class CuratorAcl {

public CuratorFramework client = null;
public static final String zkServerPath = "10.211.55.6:2181";

public CuratorAcl() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder().authorization("digest", "mxx:123456".getBytes())
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}

public void closeZKClient() {
if (client != null) {
this.client.close();
}
}

public static void main(String[] args) throws Exception {
// 实例化
CuratorAcl cto = new CuratorAcl();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

// String nodePath = "/acl/father/child/sub";
String nodePath = "/acl/mxx";

//权限list
List<ACL> acls = new ArrayList<ACL>();
Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456"));
Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456"));
acls.add(new ACL(Perms.ALL, imooc1));
acls.add(new ACL(Perms.READ, imooc2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2));

// 1、创建节点,给权限
byte[] data = "spiderman".getBytes();
cto.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls, true) //true(不常用):递归赋权限(father、child、sub都赋了权限)
.forPath(nodePath, data);

// 2、修改已有节点的权限
// cto.client.setACL().withACL(acls).forPath("/");

// 更新节点数据
// byte[] newData = "batman".getBytes();
// cto.client.setData().withVersion(0).forPath(nodePath, newData);

// 删除节点
// cto.client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(0).forPath(nodePath);

// 读取节点数据
// Stat stat = new Stat();
// byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
// System.out.println("节点" + nodePath + "的数据为: " + new String(data));
// System.out.println("该节点的版本号为: " + stat.getVersion());


cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
}

}
文章作者: Machine
文章链接: https://machine4869.gitee.io/2018/11/20/20181120104828229/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论