Java 并发 - AQS 组件
一、CountDownLatch
1.1 特性
让某一线程到达某一时刻才开始做事
1.2 实例
1.2.1 需求
需要所有线程都完成了才运行主线程提示所有线程运行结束?
class Test {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " is running...");
}, "T-" + i).start();
}
System.out.println(Thread.currentThread().getName() + ": every thread is over!");
}
}
1.2.2 问题
T-1 is running...
T-2 is running...
main: every thread is over! //--> 提前出现了!不对!
T-4 is running...
T-3 is running...
T-5 is running...
发现主线程会穿插其中...
1.2.3 解决
使用 CountDownLatch 来计数同步,加大线程数来测试几次,没有出错:
class Test {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(500);
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " is running...");
countDownLatch.countDown();
}, "T-" + i).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": every thread is over!");
}
}
输出:
T-2 is running...
T-1 is running...
T-4 is running...
T-5 is running...
T-3 is running...
T-6 is running...
T-7 is running...
T-8 is running...
T-9 is running...
T-10 is running...
main: every thread is over!
1.2.4 注意
countDownLatch 的计数操作一定要在线程操作后,因为如果提前计数,那么可能导致线程还未操作,最后的主线程提前到达操作条件了,于是又出现主线程比其他线程更快执行的问题
1.3 总结
CountDownLatch 主要有两个方法常用:
-
调用 countDown () 的线程,计数器会减 1,不会阻塞
-
调用 await() 的线程会被阻塞,计数器变为 0 就会被唤醒
二、CyclicBarrier
2.1 特性
让线程达到某一时刻才开始做事
2.2 实例
2.2.1 需求
同 1.2.1 需求
2.2.2 问题
同 1.2.2 问题
2.2.3 解决
使用 CyclicBarrier 来解决:
class Test {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {
System.out.println(Thread.currentThread().getName() + ": every thread is over!");
});
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " is running...");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "T-" + i).start();
}
}
}
输出:
T-1 is running...
T-2 is running...
T-3 is running...
T-4 is running...
T-5 is running...
T-6 is running...
T-7 is running...
T-8 is running...
T-9 is running...
T-10 is running...
T-10: every thread is over!
2.2.4 注意
CyclicBarrier 是基于加法,初始化一个计数值,在一定数量的线程执行后才执行指定的代码
并且指定的代码是由符合条件计数条件时执行的,跟执行它的那个线程没有关系,例如一千个线程,可能在这里的 Demo 执行到输出 "every thread is over" 的,会是 837 号线程
1.3 总结
CyclicBarrier 常用的东西:
-
两个构造参数的构造器,指定一个 int parties ,Runnable action;
- parties:线程执行的数量
- action:在数量达到时执行的代码
-
await() 方法,这个在线程使用的时候,就表示阻塞一次线程构造器里的那段代码,阻塞到 parties 数量后就会通过其中一个线程取执行 action 代码
三、Semaphore
3.1 特性
信号量,可以指定多个线程同时访问某个资源
3.2 实例
3.2.1 需求
锁相当于一个互斥资源,假设争抢一个互斥资源可以做到,三个互斥资源的竞争,该如何做到呢?
3.2.2 问题
Java 提供的锁实现多个互斥量的同步
3.3.3 解决
虽然自己写一个类似的互斥资源类,但 Java 已经提供了 AQS 组件 Semaphore 来给我们使用
class Test {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" get resource!");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName()+" release resource!");
semaphore.release();
}
}, "T-" + i).start();
}
}
}
输出:
T-1 get resource!
T-2 get resource!
T-3 get resource!
T-1 release resource!
T-4 get resource!
T-2 release resource!
T-5 get resource!
T-3 release resource!
T-6 get resource!
T-4 release resource!
T-5 release resource!
T-6 release resource!
3.3.4 注意
semaphore 的 release() 操作要放在线程全部操作之后
3.3 总结
上面说的其他两种组件就好像是时刻性的同步组件,而 Semaphore 是属于资源类的信号量,针对的是互斥资源数量这个角度的
semaphore 有两个常用的东西:
-
acquire() 尝试取得一个互斥资源
-
release() 释放当前占用的互斥资源
-
构造器常用的是初始化指定数量的互斥量:Semaphore(int permit) ,permit 指定了互斥资源数量
四、Condition
4.1 概念
Condition 与 Lock 配合完成等待 / 通知 机制,比起重量锁的对象监视器的等待 / 通知机制,Condition 可控制性和拓展性更强
4.2 性质
- 超时操作
- 中断
- 多个等待队列
4.3 原理
4.3.1 newCondition()
Conditino 的创建与使用是与 Lock 相互绑定的:
- Lock lock = new ReentrantLock();
- Condition c = lock.newCondition();
这两步发生了什么?看看源码片段
创建可重复锁,默认是非公平的:
public ReentrantLock() {
sync = new NonfairSync();
}
通过 sync 对象创建 Condition:
public Condition newCondition() {
return sync.newCondition();
}
sync 创建的是一个 ConditionObject 对象:
final ConditionObject newCondition() {
return new ConditionObject();
}
好吧,再往下走,会发现 ConditionObject 来自 AQS(AbstractQueuedSynchronizer):
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
...
}
所以,这里取得的 c 其实是一个 ConditionObject 对象, ConditionObject 是 AQS 的一个内部类
回顾一下,在机制实现中, AQS 维护了一个同步队列:
- 如果是独占锁,所有获取锁失败的线程的都会插入到同步队列的尾部
Condition 也是一样的方式,内部维护了一个等待队列,所有调用 condition.await() 的线程都会加入到等待队列中,并且线程状态转换为无限期等待状态,
Condition 通过持有等待队列的头尾指针来管理等待队列,Node 复用了 AQS 中的 Node 内部类,每个线程就会被封装到不同的 Node 节点中
Condition 维护的等待队列是单向队列,AQS 维护的是双向队列
lock.newCondition() 实质上,就是创建单向队列的过程,所以,一个 lock 可以有多个等待队列
4.3.2 await()
上面知道了是 lock 生出了 condition,那么condition.await() 一旦调用,就会把当前获取到锁的线程加入到等待队列
4.3.3 signal() / signalAll
当调用了 signal() / signalAll 会使得线程从等待队列移入到同步队列中,直到获取到 lock 后才会从 await() 方法中返回逃出来!或者在等待时被中断,也可以做中断处理
移入到同步队列的线程,就是唤醒的节点,将会通过自旋方式来进行锁竞争