[TOC]
第7章 J.U.C之AQS
7-1 简介
AbstractQueuedSynchronizer - AQS
数据结构

- 底层是双向链表,队列的一种实现
- sync queue:同步队列,head节点主要负责后面的调度
- Condition queue:单向链表,不是必须的的,也可以有多个
设计原理
- 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
- 利用了一个int类型标示状态,有一个state的成员变量,表示获取锁的线程数(0没有线程获取锁,1有线程获取锁,大于1表示重入锁的数量),和一个同步组件ReentrantLock,
- 使用方法是继承,基于模板方法
- 子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操作状态
- 可以实现排它锁和共享锁的模式(独占、共享)
具体实现的思路
- 首先 AQS内部维护了一个CLH队列,来管理锁
- 线程尝试获取锁,如果获取失败,则将等待信息等包装成一个Node结点,加入到同步队列Sync queue里
- 不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒
- 当持有锁的线程释放锁的时候,会唤醒队列中的后继线程
AQS同步组件
- CountDownLatch
- Semaphore
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
7-2 CountDownLatch
同步阻塞类,可以完成阻塞线程的功能

使用场景
程序执行需要等待某个条件完成后,才能进行后面的操作。比如父任务等待所有子任务都完成的时候,在继续往下进行
基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
@Slf4j public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); }
countDownLatch.await(); log.info("finish"); exec.shutdown(); }
private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
|
场景:比如有多个线程完成一个任务,但是这个任务只想给他一个指定的时间,超过这个任务就不继续等待了。完成多少算多少
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
@Slf4j public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); }
private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }
|
7-3 Semaphore
使用场景
1、仅能提供有限访问的资源(控制并发数):比如数据库的连接数最大只有20,而上层的并发数远远大于20,这时候如果不做限制,可能会由于无法获取连接而导致并发异常,这时候可以使用Semaphore来进行控制,当信号量设置为1的时候,就和单线程很相似了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;
@Slf4j public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); test(threadNum); semaphore.release(); } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); }
private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;
@Slf4j public class SemaphoreExample2 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(3); test(threadNum); semaphore.release(3); } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); }
private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
|
2、并发很高,想要超过允许的并发数之后,就抛弃
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;
@Slf4j public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { if (semaphore.tryAcquire()) { test(threadNum); semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); }
private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
|
7-4 CyclicBarrier
同步辅助类,允许一组线程相互等待,知道所有线程都准备就绪后,才能继续操作,当某个线程调用了await方法之后,就会进入等待状态,并将计数器-1,直到所有线程调用await方法使计数器为0,才可以继续执行,由于计数器可以重复使用,所以我们又叫他循环屏障

使用场景
多线程计算数据,最后合并计算结果的应用场景:
比如用Excel保存了用户的银行流水,每一页保存了一个用户近一年的每一笔银行流水,现在需要统计用户的日均银行流水,这时候我们就可以用多线程处理每一页里的银行流水,都执行完以后,得到每一个页的日均银行流水,之后通过CyclicBarrier的action,利用这些线程的计算结果,计算出整个excel的日均流水
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
@Slf4j public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); }
private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
@Slf4j public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); }
private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.machine.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
@Slf4j public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); });
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); }
private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
|
CyclicBarrier与CountDownLatch区别
CyclicBarrier可以重复使用(使用reset方法),CountDownLatch只能用一次
CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景
7-5 ReentrantLock与锁
java一共分为两类锁,一类是由synchornized修饰的锁,还有一种是JUC里提供的锁,核心就是ReentrantLock
synchornized与ReentrantLock的区别对比
| 对比维度 |
synchornized |
ReentrantLock |
| 可重入性(进入锁的时候计数器自增1) |
可重入 |
可重入 |
| 锁的实现 |
JVM实现,很难操作源码,得到实现 |
JDK实现 |
| 性能 |
在引入轻量级锁后性能大大提升,建议都可以选择的时候选择synchornized |
- |
| 功能区别 |
方便简洁,由编译器负责加锁和释放锁 |
手工操作 |
| 粗粒度,不灵活 |
细粒度,可灵活控制 |
|
| 可否指定公平锁 |
不可以 |
可以 |
| 可否放弃锁 |
不可以 |
可以 |
ReentrantLock独有功能:
- 可指定是公平锁还是非公平锁
- 提供了一个Condition类,可以分组唤醒需要唤醒的线程
- 提供能够中断等待锁的线程的机制,lock.lockInterruptibly()
ReentrantLock实现:自旋锁,循环调用CAS操作来实现加锁,避免了使线程进入内核态的阻塞状态。想办法阻止线程进入内核态的阻塞状态,是我们分析和理解锁的关键钥匙
基本使用:类似synchronized
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.machine.concurrency.example.lock;
import com.machine.concurrency.annotations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
@Slf4j @ThreadSafe public class LockExample2 {
public static int clientTotal = 5000;
public static int threadTotal = 200;
public static int count = 0;
private final static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); }
private static void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
|
ReentrantLock内置函数
基础特性:
tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取锁定。
tryLock(long timeout, TimeUnit unit):如果锁定在给定的时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。
lockInterruptbily:如果当前线程没有被中断的话,那么就获取锁定。如果中断了就抛出异常。
isLocked:查询此锁定是否由任意线程保持
isHeldByCurrentThread:查询当前线程是否保持锁定状态。
isFair:判断是不是公平锁
…
ReentrantReadWriteLock
在没有任何读写锁的时候才能取得写入的锁,可用于实现悲观读取(读优先,没有读时才能写),读多写少的场景下可能会出现线程饥饿
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.machine.concurrency.example.lock;
import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } }
public Set<String> getAllKeys() { readLock.lock(); try { return map.keySet(); } finally { readLock.unlock(); } }
public Data put(String key, Data value) { writeLock.lock(); try { return map.put(key, value); } finally { readLock.unlock(); } }
class Data {
} }
|
StempedLock
它控制锁有三种模式(写、读、乐观读)。一个StempedLock的状态是由版本和模式两个部分组成。锁获取方法返回一个数字作为票据(stamp),他用相应的锁状态表示并控制相关的访问。数字0表示没有写锁被锁写访问,在读锁上分为悲观锁和乐观锁。
乐观读:
如果读的操作很多写的很少,我们可以乐观的认为读的操作与写的操作同时发生的情况很少,因此不悲观的使用完全的读取锁定。程序可以查看读取资料之后是否遭到写入资料的变更,再采取之后的措施。
示例源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package com.machine.concurrency.example.lock;
import java.util.concurrent.locks.StampedLock;
public class LockExample4 {
class Point { private double x, y; private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } }
double distanceFromOrigin() { long stamp = sl.tryOptimisticRead(); double currentX = x, currentY = y; if (!sl.validate(stamp)) { stamp = sl.readLock(); try { currentX = x; currentY = y; } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); }
void moveIfAtOrigin(double newX, double newY) { long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = sl.tryConvertToWriteLock(stamp); if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { sl.unlock(stamp); } } } }
|
简单使用
1 2 3 4 5 6 7 8 9 10
| private final static StampedLock lock = new StampedLock();
private static void add() { long stamp = lock.writeLock(); try { count++; } finally { lock.unlock(stamp); } }
|
总结关于锁的几个类:
synchronized:JVM实现,不但可以通过一些监控工具监控,而且在出现未知异常的时候JVM也会自动帮我们释放锁
ReentrantLock、ReentrantRead/WriteLock、StempedLock 他们都是对象层面的锁定,要想保证锁一定被释放,要放到finally里面,才会更安全一些;StempedLock对性能有很大的改进,特别是在读线程越来越多的情况下,StempedLock有一个复杂的API。
如何选择锁?
- 当只有少量竞争者,使用synchronized
- 竞争者不少但是线程增长的趋势是能预估的,使用ReetrantLock
- synchronized不会造成死锁,jvm会自动释放死锁。
Condition
多线程间协调通信的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.machine.concurrency.example.lock;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
@Slf4j public class LockExample6 {
public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(() -> { try { reentrantLock.lock(); log.info("wait signal"); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("get signal");
reentrantLock.unlock(); }).start();
new Thread(() -> {
reentrantLock.lock(); log.info("get lock"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
condition.signalAll(); log.info("send signal ~ ");
reentrantLock.unlock(); }).start(); }
}
|