avatar

目录
ZooKeeper专题(4):zk原生Java API的使用

代码地址:note_zookeeper

第6章 使用ZooKeeper原生Java API进行客户端开发

6-1 建立客户端与zk服务端的连接

zk与原生Java api的使用

  • 会话连接与恢复

  • 节点的增删改查

  • watch与acl相关操作

pom

Code
1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>

会话连接

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
public class ZKConnect implements Watcher{

public static final String zkServerPath = "10.211.55.6:2181";
// public static final String zkServerPath = "192.168.1.111:2181,192.168.1.111:2182,192.168.1.111:2183";
public static final Integer timeout = 5000;

public static void main(String[] args) throws Exception{
/**
* public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
*
* sessionTimeout:超时时间,心跳收不到了,那就超时
* watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
*
* 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
* sessionId:会话的id
* sessionPasswd:会话密码
*
*
*/
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnect());
log.info("客户端开始连接zookeeper服务器...");
log.info("连接状态:{}", zk.getState());
new Thread().sleep(2000);

log.info("连接状态:{}", zk.getState());
}

@Override
public void process(WatchedEvent event) {
log.info("接受到watch通知:{}",event);
}
}

6-2 zk会话重连机制

会话恢复

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @Description: zookeeper 恢复之前的会话连接demo演示
*/
@Slf4j
public class ZKConnectSessionWatcher implements Watcher{
public static final String zkServerPath = "10.211.55.6:2181";
public static final Integer timeout = 5000;

public static void main(String[] args) throws Exception{

ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher());
log.info("客户端开始连接zookeeper服务器...");
log.info("连接状态:{}", zk.getState());
new Thread().sleep(2000);
log.info("连接状态:{}", zk.getState());

long sessionId = zk.getSessionId();
byte[] sessionPassword = zk.getSessionPasswd();
String ssid = "0x" + Long.toHexString(sessionId);
System.out.println(ssid);


// 开始会话重连
log.info("开始会话重连...");

//使用sessionId、sessionPassword恢复上次会话
ZooKeeper zkSession = new ZooKeeper(zkServerPath,
timeout,
new ZKConnectSessionWatcher(),
sessionId,
sessionPassword);
log.info("重新连接状态zkSession:{}", zkSession.getState());
new Thread().sleep(1000);
log.info("重新连接状态zkSession:{}", zkSession.getState());

long sessionId2 = zkSession.getSessionId();
String ssid2 = "0x" + Long.toHexString(sessionId2);
System.out.println(ssid2);//两次的id一致
}

@Override
public void process(WatchedEvent event) {
log.info("接受到watch通知:{}",event);
}
}

6-3 同步/异步 创建/修改/删除zk节点

同步/异步 创建/修改/删除zk节点

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class ZKNodeOperator implements Watcher {

private ZooKeeper zookeeper = null;

public static final String zkServerPath = "10.211.55.6:2181";
public static final Integer timeout = 5000;

public ZKNodeOperator() {}

public ZKNodeOperator(String connectString) {
try {
zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeOperator());
} catch (IOException e) {
e.printStackTrace();
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}


public ZooKeeper getZookeeper() {
return zookeeper;
}
public void setZookeeper(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}

@Override
public void process(WatchedEvent event) {
}

/**
* @Description: 创建zk节点
*/
public void createZKNode(String path, byte[] data, List<ACL> acls) {

String result = "";
try {
/**
* 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
* 参数:
* path:创建的路径
* data:存储的数据的byte[]
* acl:控制权限策略
* Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode:节点类型, 是一个枚举
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
*/

//同步创建节点
// result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);

//异步创建节点
String ctx = "{'create':'success'}";
zookeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);

System.out.println("创建节点:\t" + result + "\t成功...");
new Thread().sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);

// 1、创建zk节点(同步、异步;临时、永久)
// zkServer.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);

/**
* 参数:
* path:节点路径
* data:数据
* version:数据状态
*/

// 2、修改节点数据(同步、异步)
// Stat status = zkServer.getZookeeper().setData("/testnode", "xyz".getBytes(), 0);
// System.out.println(status.getVersion());


//3、删除节点(同步、异步)
zkServer.createZKNode("/test-delete-node", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
// zkServer.getZookeeper().delete("/test-delete-node", 0);

String ctx = "{'delete':'success'}";
zkServer.getZookeeper().delete("/test-delete-node", 0, new DeleteCallBack(), ctx);
Thread.sleep(2000);

}

}

CreateCallBack

java
1
2
3
4
5
6
7
8
9
10
11
import org.apache.zookeeper.AsyncCallback.StringCallback;

public class CreateCallBack implements StringCallback {

@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("创建节点: " + path);
System.out.println((String)ctx);
}

}

DeleteCallBack

java
1
2
3
4
5
6
7
8
9
10
import org.apache.zookeeper.AsyncCallback.VoidCallback;

public class DeleteCallBack implements VoidCallback {
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("删除节点" + path);
System.out.println((String)ctx);
}

}

6-4 获取zk节点数据

使用CountDownLatch做节点查询

  • 获取(父节点)节点数据
  • 获取子节点数据
  • 判断节点是否存在

CountDownLatch知识点

获取节点数据

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
*
* @Description: zookeeper 获取节点数据的demo演示
*/
public class ZKGetNodeData implements Watcher {

//...

//只监听一次
private static CountDownLatch countDown = new CountDownLatch(1);

public static void main(String[] args) throws Exception {

ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);

/**
* 参数:
* path:节点路径
* watch:true或者false,注册一个watch事件
* stat:状态
*/
byte[] resByte = zkServer.getZookeeper().getData("/names", true, stat);
String result = new String(resByte);
System.out.println("当前值:" + result);

//等待监听事件(process)结束
countDown.await();
}

@Override
public void process(WatchedEvent event) {
try {
if(event.getType() == EventType.NodeDataChanged){
ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
byte[] resByte = zkServer.getZookeeper().getData("/names", false, stat);
String result = new String(resByte);
System.out.println("更改后的值:" + result);
System.out.println("版本号变化dversion:" + stat.getVersion());
countDown.countDown();
} else if(event.getType() == EventType.NodeCreated) {

} else if(event.getType() == EventType.NodeChildrenChanged) {

} else if(event.getType() == EventType.NodeDeleted) {

}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

6-5 获取zk子节点列表

获取子节点数据

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* @Description: zookeeper 获取子节点数据的demo演示
*/
public class ZKGetChildrenList implements Watcher {

//...


private static CountDownLatch countDown = new CountDownLatch(1);

public static void main(String[] args) throws Exception {

ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);

/**
* 参数:
* path:父节点路径
* watch:true或者false,注册一个watch事件
*/

//同步调用
// List<String> strChildList = zkServer.getZookeeper().getChildren("/names", true);
// for (String s : strChildList) {
// System.out.println(s);
// }

// 异步调用
String ctx = "{'callback':'ChildrenCallback'}";
// zkServer.getZookeeper().getChildren("/names", true, new ChildrenCallBack(), ctx);
zkServer.getZookeeper().getChildren("/names", true, new Children2CallBack(), ctx);

countDown.await();
}

@Override
public void process(WatchedEvent event) {
try {
if(event.getType()==EventType.NodeChildrenChanged){
System.out.println("NodeChildrenChanged");
ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
List<String> strChildList = zkServer.getZookeeper().getChildren(event.getPath(), false);
for (String s : strChildList) {
System.out.println(s);
}
countDown.countDown();
} else if(event.getType() == EventType.NodeCreated) {
System.out.println("NodeCreated");
} else if(event.getType() == EventType.NodeDataChanged) {
System.out.println("NodeDataChanged");
} else if(event.getType() == EventType.NodeDeleted) {
System.out.println("NodeDeleted");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

ChildrenCallBack

java
1
2
3
4
5
6
7
8
9
10
11
12
public class ChildrenCallBack implements ChildrenCallback {

@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
for (String s : children) {
System.out.println(s);
}
System.out.println("ChildrenCallback:" + path);
System.out.println((String)ctx);
}

}

Children2Callback

java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Children2CallBack implements Children2Callback {

@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
for (String s : children) {
System.out.println(s);
}
System.out.println("ChildrenCallback:" + path);
System.out.println((String)ctx);
//Children2Callback多了一个stat
System.out.println(stat.toString());
}
}

6-6 判断zk节点是否存在

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* @Description: zookeeper 判断阶段是否存在demo
*/
public class ZKNodeExist implements Watcher {

//...

private static CountDownLatch countDown = new CountDownLatch(1);

public static void main(String[] args) throws Exception {

ZKNodeExist zkServer = new ZKNodeExist(zkServerPath);

/**
* 参数:
* path:节点路径
* watch:watch
*/
Stat stat = zkServer.getZookeeper().exists("/names-fake", true);
if (stat != null) {
System.out.println("查询的节点版本为dataVersion:" + stat.getVersion());
} else {
System.out.println("该节点不存在...");
}

countDown.await();
}

@Override
public void process(WatchedEvent event) {
//该节点不存在,也可以监听创建情况
if (event.getType() == EventType.NodeCreated) {
System.out.println("节点创建");
countDown.countDown();
} else if (event.getType() == EventType.NodeDataChanged) {
System.out.println("节点数据改变");
countDown.countDown();
} else if (event.getType() == EventType.NodeDeleted) {
System.out.println("节点删除");
countDown.countDown();
}
}
}

6-7 acl - 默认匿名权限/自定义用户权限/ip权限

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
*
* @Description: zookeeper 操作节点acl演示
*/
public class ZKNodeAcl implements Watcher {

//...

public static void main(String[] args) throws Exception {

ZKNodeAcl zkServer = new ZKNodeAcl(zkServerPath);

/**
* ====================== 创建node start ======================
*/

//1、默认匿名权限
// acl 任何人都可以访问('world,'anyone': cdrwa)
// zkServer.createZKNode("/acl-node", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);


//2、自定义用户权限
// 自定义用户认证访问
// List<ACL> acls = new ArrayList<ACL>();
// Id usr1 = new Id("digest", AclUtils.getDigestUserPwd("usr1:123456"));
// Id usr2 = new Id("digest", AclUtils.getDigestUserPwd("usr2:123456"));
// acls.add(new ACL(Perms.ALL, usr1));
// acls.add(new ACL(Perms.READ, usr2));
// acls.add(new ACL(Perms.DELETE | Perms.CREATE, usr2));
// zkServer.createZKNode("/acl-node/testdigest", "testdigest".getBytes(), acls);

//权限测试
// 注册过的用户必须通过addAuthInfo才能操作节点,参考命令行 addauth
// zkServer.getZookeeper().addAuthInfo("digest", "usr1:123456".getBytes());
//如果没有注册,就没有权限,会报错:NoAuth for /acl-node/testdigest/test
// zkServer.createZKNode("/acl-node/testdigest/childtest", "childtest".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL);
// Stat stat = new Stat();
// byte[] data = zkServer.getZookeeper().getData("/acl-node/testdigest", false, stat);
// System.out.println(new String(data));
//使用usr2进行修改操作,失败
// zkServer.getZookeeper().setData("/acl-node/testdigest", "now".getBytes(), 0);


//3、ip权限
// ip方式的acl
List<ACL> aclsIP = new ArrayList<ACL>();
Id ipId1 = new Id("ip", "127.0.0.1");
aclsIP.add(new ACL(Perms.ALL, ipId1));
zkServer.createZKNode("/acl-node/iptest", "iptest".getBytes(), aclsIP);

// 验证ip是否有权限

}

}
文章作者: Machine
文章链接: https://machine4869.gitee.io/2018/11/16/20181116180322040/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论