avatar

目录
并发编程与线程安全(4):J.U.C之AQS

[TOC]

第7章 J.U.C之AQS

7-1 简介

AbstractQueuedSynchronizer - AQS

数据结构

![屏幕快照 2018-10-27 下午11.52.26](20181027234153307/屏幕快照 2018-10-27 下午11.52.26.png)

  • 底层是双向链表,队列的一种实现
  • sync queue:同步队列,head节点主要负责后面的调度
  • Condition queue:单向链表,不是必须的的,也可以有多个

设计原理

  • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
  • 利用了一个int类型标示状态,有一个state的成员变量,表示获取锁的线程数(0没有线程获取锁,1有线程获取锁,大于1表示重入锁的数量),和一个同步组件ReentrantLock,
  • 使用方法是继承,基于模板方法
  • 子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操作状态
  • 可以实现排它锁和共享锁的模式(独占、共享)

具体实现的思路

  1. 首先 AQS内部维护了一个CLH队列,来管理锁
  2. 线程尝试获取锁,如果获取失败,则将等待信息等包装成一个Node结点,加入到同步队列Sync queue里
  3. 不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒
  4. 当持有锁的线程释放锁的时候,会唤醒队列中的后继线程

AQS同步组件

  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

7-2 CountDownLatch

同步阻塞类,可以完成阻塞线程的功能

![屏幕快照 2018-10-28 下午6.49.46](20181027234153307/屏幕快照 2018-10-28 下午6.49.46.png)

使用场景

程序执行需要等待某个条件完成后,才能进行后面的操作。比如父任务等待所有子任务都完成的时候,在继续往下进行

基本用法

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 等200个线程都执行完,再执行下一步
*/
@Slf4j
public class CountDownLatchExample1 {

private final static int threadCount = 200;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
//每个线程执行完都-1
countDownLatch.countDown();
}
});
}

//等待,直到值变为0
countDownLatch.await();
log.info("finish");
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}

场景:比如有多个线程完成一个任务,但是这个任务只想给他一个指定的时间,超过这个任务就不继续等待了。完成多少算多少

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CountDownLatchExample2 {

private final static int threadCount = 200;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
// 等待指定的时间,超过该时间就执行log.info("finish");,其余线程还会继续执行
// 等待指定的时间 参数1:等待时间 参数2:时间单位
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}

7-3 Semaphore

使用场景

1、仅能提供有限访问的资源(控制并发数):比如数据库的连接数最大只有20,而上层的并发数远远大于20,这时候如果不做限制,可能会由于无法获取连接而导致并发异常,这时候可以使用Semaphore来进行控制,当信号量设置为1的时候,就和单线程很相似了

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample1 {

private final static int threadCount = 20;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample2 {

private final static int threadCount = 20;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}

2、并发很高,想要超过允许的并发数之后,就抛弃

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SemaphoreExample3 {

private final static int threadCount = 20;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
//tryAcquire(int permits) permits:尝试获取的许可数量
//tryAcquire(long timeout, TimeUnit unit)
//tryAcquire(int permits, long timeout, TimeUnit unit)
if (semaphore.tryAcquire()) { // 尝试获取一个许可,获取不到就抛弃这个请求
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}

7-4 CyclicBarrier

同步辅助类,允许一组线程相互等待,知道所有线程都准备就绪后,才能继续操作,当某个线程调用了await方法之后,就会进入等待状态,并将计数器-1,直到所有线程调用await方法使计数器为0,才可以继续执行,由于计数器可以重复使用,所以我们又叫他循环屏障

![屏幕快照 2018-10-28 下午7.39.43](20181027234153307/屏幕快照 2018-10-28 下午7.39.43.png)

使用场景

多线程计算数据,最后合并计算结果的应用场景:

比如用Excel保存了用户的银行流水,每一页保存了一个用户近一年的每一笔银行流水,现在需要统计用户的日均银行流水,这时候我们就可以用多线程处理每一页里的银行流水,都执行完以后,得到每一个页的日均银行流水,之后通过CyclicBarrier的action,利用这些线程的计算结果,计算出整个excel的日均流水

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 5个线程相互等待,当5个线程的ready操作都准备好了,再执行continue操作
*/
@Slf4j
public class CyclicBarrierExample1 {

private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
//计数器-1,等待直到为0
barrier.await();
log.info("{} continue", threadNum);
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CyclicBarrierExample2 {

private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
//由于状态可能会改变,所以会抛出BarrierException异常,如果想继续往下执行,需要加上try-catch
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.machine.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample3 {

//当线程全部到达屏障时,优先执行这里的runable
//在ready和continue之间会执行的代码
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}

CyclicBarrier与CountDownLatch区别

  1. CyclicBarrier可以重复使用(使用reset方法),CountDownLatch只能用一次

  2. CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景

7-5 ReentrantLock与锁

java一共分为两类锁,一类是由synchornized修饰的锁,还有一种是JUC里提供的锁,核心就是ReentrantLock

synchornized与ReentrantLock的区别对比

对比维度 synchornized ReentrantLock
可重入性(进入锁的时候计数器自增1) 可重入 可重入
锁的实现 JVM实现,很难操作源码,得到实现 JDK实现
性能 在引入轻量级锁后性能大大提升,建议都可以选择的时候选择synchornized -
功能区别 方便简洁,由编译器负责加锁和释放锁 手工操作
粗粒度,不灵活 细粒度,可灵活控制
可否指定公平锁 不可以 可以
可否放弃锁 不可以 可以

ReentrantLock独有功能:

  • 可指定是公平锁还是非公平锁
  • 提供了一个Condition类,可以分组唤醒需要唤醒的线程
  • 提供能够中断等待锁的线程的机制,lock.lockInterruptibly()

ReentrantLock实现:自旋锁,循环调用CAS操作来实现加锁,避免了使线程进入内核态的阻塞状态。想办法阻止线程进入内核态的阻塞状态,是我们分析和理解锁的关键钥匙

基本使用:类似synchronized

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.machine.concurrency.example.lock;

import com.machine.concurrency.annotations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
@ThreadSafe
public class LockExample2 {

// 请求总数
public static int clientTotal = 5000;

// 同时并发执行的线程数
public static int threadTotal = 200;

public static int count = 0;

private final static Lock lock = new ReentrantLock();

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}

private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}

ReentrantLock内置函数

基础特性:

  • tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取锁定。

  • tryLock(long timeout, TimeUnit unit):如果锁定在给定的时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。

  • lockInterruptbily:如果当前线程没有被中断的话,那么就获取锁定。如果中断了就抛出异常。

  • isLocked:查询此锁定是否由任意线程保持

  • isHeldByCurrentThread:查询当前线程是否保持锁定状态。

  • isFair:判断是不是公平锁

ReentrantReadWriteLock

在没有任何读写锁的时候才能取得写入的锁,可用于实现悲观读取(读优先,没有读时才能写),读多写少的场景下可能会出现线程饥饿

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.machine.concurrency.example.lock;

import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Slf4j
public class LockExample3 {

private final Map<String, Data> map = new TreeMap<>();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private final Lock readLock = lock.readLock();

private final Lock writeLock = lock.writeLock();

public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}

public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}

//在没有任何读写锁的时候才可以进行写入操作
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}

class Data {

}
}

StempedLock

它控制锁有三种模式(写、读、乐观读)。一个StempedLock的状态是由版本和模式两个部分组成。锁获取方法返回一个数字作为票据(stamp),他用相应的锁状态表示并控制相关的访问。数字0表示没有写锁被锁写访问,在读锁上分为悲观锁和乐观锁。

乐观读:
如果读的操作很多写的很少,我们可以乐观的认为读的操作与写的操作同时发生的情况很少,因此不悲观的使用完全的读取锁定。程序可以查看读取资料之后是否遭到写入资料的变更,再采取之后的措施。

示例源码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.machine.concurrency.example.lock;

import java.util.concurrent.locks.StampedLock;

public class LockExample4 {

class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
//stamp=0表示没有写锁入侵,
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}
}

简单使用

java
1
2
3
4
5
6
7
8
9
10
private final static StampedLock lock = new StampedLock();

private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}

总结关于锁的几个类:

synchronized:JVM实现,不但可以通过一些监控工具监控,而且在出现未知异常的时候JVM也会自动帮我们释放锁

ReentrantLock、ReentrantRead/WriteLock、StempedLock 他们都是对象层面的锁定,要想保证锁一定被释放,要放到finally里面,才会更安全一些;StempedLock对性能有很大的改进,特别是在读线程越来越多的情况下,StempedLock有一个复杂的API。

如何选择锁?

  • 当只有少量竞争者,使用synchronized
  • 竞争者不少但是线程增长的趋势是能预估的,使用ReetrantLock
  • synchronized不会造成死锁,jvm会自动释放死锁。

Condition

多线程间协调通信的工具类

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.machine.concurrency.example.lock;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class LockExample6 {

public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
// 从reentrantLock实例里获取了condition
Condition condition = reentrantLock.newCondition();

new Thread(() -> {
try {
// 线程1调用了lock方法,加入到了AQS的等待队里里面去
reentrantLock.lock();
log.info("wait signal"); // 1 等待信号
// 调用await方法后,从AQS队列里移除了,进入到了condition队列里面去,等待一个信号
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4 得到信号
// 线程1释放锁
reentrantLock.unlock();
}).start();

new Thread(() -> {
// 线程1await释放锁以后,这里就获取了锁,加入到了AQS等待队列中
reentrantLock.lock();
log.info("get lock"); // 2 获取锁
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//调用signalAll发送信号的方法,Condition节点的线程1节点元素被取出,放在了AQS等待队列里(注意并没有被唤醒)
condition.signalAll();
log.info("send signal ~ "); // 3 发送信号
// 线程2释放锁,这时候AQS队列中只剩下线程1,线程1开始执行
reentrantLock.unlock();
}).start();
}

}
文章作者: Machine
文章链接: https://machine4869.gitee.io/2018/10/27/20181027234153307/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论