avatar

目录
基于zookeeper的应用:分布式锁

代码地址:springboot-dubbo

第9章 分布式锁

9-1 分布式锁的引入

  • 死锁与活锁的概念

  • 分布式锁的概念

多个系统共享数据,并发下,不加锁,会导致数据不一致

![屏幕快照 2018-11-27 上午10.57.37](20181127102931065/屏幕快照 2018-11-27 上午10.57.37.png)

所以引出zookeeper分布式锁

目的:数据最终一致性(数据库)

9-2 curator与springboot整合

之前:用curator工厂类创建客户端,然后.start启动

结合spring:在web应用下,curator随spring容器启动

\configure

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Configuration
public class CrutorConf {

//zk地址
private String zkServerPath = "10.211.55.6:2181";
//重试策略-次数
private int retryN = 10;
//重试策略-时间间隔
private int retryT = 5000;


//连接zk 重试策略
public RetryPolicy getRetryPolicy(){
RetryPolicy retryPolicy = new RetryNTimes(retryN, retryT);
return retryPolicy;
}


//zk客户端:crutor
public CuratorFramework getClient(){
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //会话超时时间
.connectionTimeoutMs(5000) //创建连接超时时间
.retryPolicy(this.getRetryPolicy())
.build();

client.start();
//返回已启动的client
return client;
}

/**
* 在spring启动时,初始化crutor,启动client,并交给spring容器
* bean name : 默认方法名"zKCrutor"
*/
@Bean
public ZKCrutor zKCrutor(){
ZKCrutor zKCrutor = new ZKCrutor(this.getClient());
zKCrutor.init();
return zKCrutor;
}
}

\utils

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ZKCrutor {
private CuratorFramework client = null;
//命名空间
private String namespace = "workspace";

public ZKCrutor(CuratorFramework client){
this.client = client;
}

//初始化操作
public void init(){
client.usingNamespace(namespace);
}


public boolean isAlive(){

return client.isStarted();
}
}

\controller

java
1
2
3
4
5
@RequestMapping("/testZKCrutor")
String testZKCrutor(){
boolean isAlive = zkCrutor.isAlive();
return isAlive?"连接":"断开";
}

改进:一个session(浏览器)对应一个zKcrutor,而不是整个容器(application)共用一个zkClient。浏览器第一次连接,就创建zKcrutor,然后放进session,之后同一个session访问就返回该zkClient。

9-3 开发分布式锁

分布式锁的流程

![屏幕快照 2018-11-27 下午2.46.58](20181127102931065/屏幕快照 2018-11-27 下午2.46.58.png)

开发分布式锁

\utils

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@Slf4j
public class DistributedLock {

CuratorFramework client = null;

//挂起当前请求,等待上一个分布式锁释放
private static CountDownLatch zkLocklatch = new CountDownLatch(1);

//分布式锁 总节点名称
private static final String ZK_LOCK_PROJECT = "mxx-locks";
//分布式 锁节点
private static final String DISTRIBUTED_LOCK = "distributed_lock";
public DistributedLock(CuratorFramework client){
this.client = client;
}

public void init(){
client = client.usingNamespace("ZKLocks-Namespace");
/**
* 创建节点
* ZKLocks-Namespace
* |
* —— mxx-locks
* |
* —— distributed_lock
*/
try {
if(client.checkExists().forPath("/"+ ZK_LOCK_PROJECT) == null){
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/"+ ZK_LOCK_PROJECT);
}

addWatcherToLock("/"+ ZK_LOCK_PROJECT);
}catch (Exception e){
log.error("客户端连接zookeeper错误,请重试...");
}
}

public void getLock(){
//当上一个锁释放,且当前锁获取成功,才跳出循环
while (true){
try{
//创建当前业务节点
client.create()
.creatingParentsIfNeeded()
//使用临时节点,会话消失,节点就消失
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/"+ ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);

//log.info("获取分布式锁成功...");
//跳出循环
return;
}catch (Exception e){
//如果以上路径存在(改锁被占用),会抛出异常
//log.info("获取分布式锁失败...");

try {
//==0表示资源已被用掉(锁已经释放),重置同步资源值
if(zkLocklatch.getCount()<=0){
zkLocklatch = new CountDownLatch(1);
}

//挂起当前线程,等待锁释放
zkLocklatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}

}
}
}

public boolean releaseLock(){

//删除当前业务节点
try {
if(client.checkExists().forPath("/"+ ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK)!=null){
client.delete().forPath("/"+ ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
}
}catch (Exception e){
e.printStackTrace();
return false;
}
//log.info("分布式锁释放完毕");
return true;
}

public void addWatcherToLock(String path) throws Exception{

//监听 "/mxx-locks"
final PathChildrenCache chche = new PathChildrenCache(client,path,true);
chche.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
chche.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

//删除业务节点
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
String path = event.getData().getPath();
//log.info("上一个会话已释放锁,节点路径为:"+path);
if(path.contains(DISTRIBUTED_LOCK)){
//log.info("释放计数器,唤醒等待线程");
zkLocklatch.countDown();
}
}
}
});
}
}

\configure

java
1
2
3
4
5
6
7
8
//分布式锁
@Bean
public DistributedLock distributedLock(){
DistributedLock distributedLock = new DistributedLock(this.getClient());
distributedLock.init();

return distributedLock;
}

\controller

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RequestMapping("/buy")
String buy(){
//count是全局常量,初始值为0
UserPojo.count = UserPojo.count+1;

return "ok";
}

@RequestMapping("/buyWithLock")
String buyWithLock(){
distributedLock.getLock();

UserPojo.count = UserPojo.count+1;

distributedLock.releaseLock();

return "ok";
}

情景推理:

java
1
2
3
4
5
6
7
8
9
10
11
1成功创建(锁了)

23同时进入失败,挂起

1释放锁(删除节点,触发监听,countdown使得count=0

23同时被重新启动,重新进入死循环,尝试获取锁

23有一个成功了,有一个失败了

比如3失败,他进入catch,重置CountDownLatch,然后等着,直到2用完释放

测试分布式锁

ab -n 1000 -c 50 http://localhost:8081/user/buy # count无法达到1000

ab -n 1000 -c 50 http://localhost:8081/user/buyWithLock #count每次都为1000

改进:crutor/分布式锁的代码,正常情况下该放在 produce里面,consumer只完成controller的任务。

文章作者: Machine
文章链接: https://machine4869.gitee.io/2018/11/27/20181127102931065/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论