代码地址:note_zookeeper
第7章 Apache Curator客户端的使用
7-1 curator简介与客户端之间的异同点
常用的zk java客户端
- zk原生api
- zkclient
- Apache curator(常用)
原生api的不足
- 超时重连,不支持自动,需要手动操作
- Watch注册一次后会失效
- 不支持递归创建节点
Apache curator
- Apache的开源项目
- 解决watch的注册一次就失效
- Api更加简单易用
- 提供更多解决方案且实现简单:比如 分布式锁
- 提供常用的Zookeeper工具类
- 编程风格好
7-2 curator基本操作
1、会话连接与关闭(RetryPolicy:重试机制)
2、节点的增删改查(namespace)
3、watcher监听(usingWatcher、nodeCache、PathChildrenCache)
pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.11</version> </dependency>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
|
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
| package machine.example.zookeeper.curator;
import java.util.List;
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat;
public class CuratorOperator {
public CuratorFramework client = null; public static final String zkServerPath = "10.211.55.6:2181";
public CuratorOperator() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder() .connectString(zkServerPath) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace") .build(); client.start(); }
public void closeZKClient() { if (client != null) { this.client.close(); } } public static void main(String[] args) throws Exception {
CuratorOperator cto = new CuratorOperator(); boolean isZkCuratorStarted = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
String nodePath = "/super/mxx";
String childNodePathCache = nodePath; final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){ System.out.println("子节点初始化ok..."); }
else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ String path = event.getData().getPath(); if (path.equals(ADD_PATH)) { System.out.println("添加子节点:" + event.getData().getPath()); System.out.println("子节点数据:" + new String(event.getData().getData())); } else if (path.equals("/super/imooc/e")) { System.out.println("添加不正确..."); }
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ System.out.println("删除子节点:" + event.getData().getPath()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("修改子节点路径:" + event.getData().getPath()); System.out.println("修改子节点数据:" + new String(event.getData().getData())); } } });
Thread.sleep(100000); cto.closeZKClient(); boolean isZkCuratorStarted2 = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
}
public final static String ADD_PATH = "/super/mxx/d";
}
|
7-3 zk-watcher实例:统一更新N台节点的配置文件

思路:新的配置动作会组成json串,set到zk节点,client监听到节点数据变化后,就下载更新新的配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| public class Client1 {
public CuratorFramework client = null; public static final String zkServerPath = "10.211.55.6:2181";
public Client1() { RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder() .connectString(zkServerPath) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace").build(); client.start(); } public void closeZKClient() { if (client != null) { this.client.close(); } }
public final static String CONFIG_NODE_PATH = "/super/imooc"; public final static String SUB_PATH = "/redis-config"; public static CountDownLatch countDown = new CountDownLatch(1); public static void main(String[] args) throws Exception { Client1 cto = new Client1(); System.out.println("client1 启动成功..."); final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true); childrenCache.start(StartMode.BUILD_INITIAL_CACHE); childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ String configNodePath = event.getData().getPath(); if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) { System.out.println("监听到配置发生变化,节点路径为:" + configNodePath); String jsonConfig = new String(event.getData().getData()); System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig); RedisConfig redisConfig = null; if (StringUtils.isNotBlank(jsonConfig)) { redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class); } if (redisConfig != null) { String type = redisConfig.getType(); String url = redisConfig.getUrl(); String remark = redisConfig.getRemark(); if (type.equals("add")) { System.out.println("监听到新增的配置,准备下载..."); Thread.sleep(500); System.out.println("开始下载新的配置文件,下载路径为<" + url + ">"); Thread.sleep(1000); System.out.println("下载成功,已经添加到项目中"); } else if (type.equals("update")) { System.out.println("监听到更新的配置,准备下载..."); Thread.sleep(500); System.out.println("开始下载配置文件,下载路径为<" + url + ">"); Thread.sleep(1000); System.out.println("下载成功..."); System.out.println("删除项目中原配置文件..."); Thread.sleep(100); System.out.println("拷贝配置文件到项目目录..."); } else if (type.equals("delete")) { System.out.println("监听到需要删除配置"); System.out.println("删除项目中原配置文件..."); } } } } } }); countDown.await(); cto.closeZKClient(); } }
|
7-4 acl权限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| public class CuratorAcl {
public CuratorFramework client = null; public static final String zkServerPath = "10.211.55.6:2181";
public CuratorAcl() { RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder().authorization("digest", "mxx:123456".getBytes()) .connectString(zkServerPath) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace").build(); client.start(); } public void closeZKClient() { if (client != null) { this.client.close(); } } public static void main(String[] args) throws Exception { CuratorAcl cto = new CuratorAcl(); boolean isZkCuratorStarted = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
String nodePath = "/acl/mxx";
List<ACL> acls = new ArrayList<ACL>(); Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456")); Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456")); acls.add(new ACL(Perms.ALL, imooc1)); acls.add(new ACL(Perms.READ, imooc2)); acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2)); byte[] data = "spiderman".getBytes(); cto.client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls, true) .forPath(nodePath, data);
cto.closeZKClient(); boolean isZkCuratorStarted2 = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭")); } }
|