代码地址:springboot-dubbo
第9章 分布式锁
9-1 分布式锁的引入
多个系统共享数据,并发下,不加锁,会导致数据不一致

所以引出zookeeper分布式锁
目的:数据最终一致性(数据库)
9-2 curator与springboot整合
之前:用curator工厂类创建客户端,然后.start启动
结合spring:在web应用下,curator随spring容器启动
\configure
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Configuration public class CrutorConf {
private String zkServerPath = "10.211.55.6:2181"; private int retryN = 10; private int retryT = 5000;
public RetryPolicy getRetryPolicy(){ RetryPolicy retryPolicy = new RetryNTimes(retryN, retryT); return retryPolicy; }
public CuratorFramework getClient(){ CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zkServerPath) .sessionTimeoutMs(10000) .connectionTimeoutMs(5000) .retryPolicy(this.getRetryPolicy()) .build();
client.start(); return client; }
@Bean public ZKCrutor zKCrutor(){ ZKCrutor zKCrutor = new ZKCrutor(this.getClient()); zKCrutor.init(); return zKCrutor; } }
|
\utils
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class ZKCrutor { private CuratorFramework client = null; private String namespace = "workspace";
public ZKCrutor(CuratorFramework client){ this.client = client; }
public void init(){ client.usingNamespace(namespace); }
public boolean isAlive(){
return client.isStarted(); } }
|
\controller
1 2 3 4 5
| @RequestMapping("/testZKCrutor") String testZKCrutor(){ boolean isAlive = zkCrutor.isAlive(); return isAlive?"连接":"断开"; }
|
改进:一个session(浏览器)对应一个zKcrutor,而不是整个容器(application)共用一个zkClient。浏览器第一次连接,就创建zKcrutor,然后放进session,之后同一个session访问就返回该zkClient。
9-3 开发分布式锁
分布式锁的流程

开发分布式锁
\utils
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| @Slf4j public class DistributedLock {
CuratorFramework client = null;
private static CountDownLatch zkLocklatch = new CountDownLatch(1);
private static final String ZK_LOCK_PROJECT = "mxx-locks"; private static final String DISTRIBUTED_LOCK = "distributed_lock"; public DistributedLock(CuratorFramework client){ this.client = client; }
public void init(){ client = client.usingNamespace("ZKLocks-Namespace");
try { if(client.checkExists().forPath("/"+ ZK_LOCK_PROJECT) == null){ client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/"+ ZK_LOCK_PROJECT); }
addWatcherToLock("/"+ ZK_LOCK_PROJECT); }catch (Exception e){ log.error("客户端连接zookeeper错误,请重试..."); } }
public void getLock(){ while (true){ try{ client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/"+ ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
return; }catch (Exception e){
try { if(zkLocklatch.getCount()<=0){ zkLocklatch = new CountDownLatch(1); }
zkLocklatch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); }
} } }
public boolean releaseLock(){
try { if(client.checkExists().forPath("/"+ ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK)!=null){ client.delete().forPath("/"+ ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK); } }catch (Exception e){ e.printStackTrace(); return false; } return true; }
public void addWatcherToLock(String path) throws Exception{
final PathChildrenCache chche = new PathChildrenCache(client,path,true); chche.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); chche.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ String path = event.getData().getPath(); if(path.contains(DISTRIBUTED_LOCK)){ zkLocklatch.countDown(); } } } }); } }
|
\configure
1 2 3 4 5 6 7 8
| @Bean public DistributedLock distributedLock(){ DistributedLock distributedLock = new DistributedLock(this.getClient()); distributedLock.init();
return distributedLock; }
|
\controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @RequestMapping("/buy") String buy(){ UserPojo.count = UserPojo.count+1;
return "ok"; }
@RequestMapping("/buyWithLock") String buyWithLock(){ distributedLock.getLock();
UserPojo.count = UserPojo.count+1;
distributedLock.releaseLock();
return "ok"; }
|
情景推理:
1 2 3 4 5 6 7 8 9 10 11
| 1成功创建(锁了)
2、3同时进入失败,挂起
1释放锁(删除节点,触发监听,countdown使得count=0)
2、3同时被重新启动,重新进入死循环,尝试获取锁
2、3有一个成功了,有一个失败了
比如3失败,他进入catch,重置CountDownLatch,然后等着,直到2用完释放
|
测试分布式锁
ab -n 1000 -c 50 http://localhost:8081/user/buy # count无法达到1000
ab -n 1000 -c 50 http://localhost:8081/user/buyWithLock #count每次都为1000
改进:crutor/分布式锁的代码,正常情况下该放在 produce里面,consumer只完成controller的任务。