Java 并发 - AQS 组件

328

一、CountDownLatch

1.1 特性

让某一线程到达某一时刻才开始做事

1.2 实例

1.2.1 需求

需要所有线程都完成了才运行主线程提示所有线程运行结束?

class Test {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " is running...");
            }, "T-" + i).start();
        }
        System.out.println(Thread.currentThread().getName() + ": every thread is over!");

    }
}

1.2.2 问题

T-1 is running...
T-2 is running...
main: every thread is over! //--> 提前出现了!不对!
T-4 is running...
T-3 is running...
T-5 is running...

发现主线程会穿插其中...

1.2.3 解决

使用 CountDownLatch 来计数同步,加大线程数来测试几次,没有出错:

class Test {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(500);
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " is running...");
                countDownLatch.countDown();
            }, "T-" + i).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + ": every thread is over!");
    }
}

输出:

T-2 is running...
T-1 is running...
T-4 is running...
T-5 is running...
T-3 is running...
T-6 is running...
T-7 is running...
T-8 is running...
T-9 is running...
T-10 is running...
main: every thread is over!

1.2.4 注意

countDownLatch 的计数操作一定要在线程操作后,因为如果提前计数,那么可能导致线程还未操作,最后的主线程提前到达操作条件了,于是又出现主线程比其他线程更快执行的问题

1.3 总结

CountDownLatch 主要有两个方法常用:

  • 调用 countDown () 的线程,计数器会减 1,不会阻塞

  • 调用 await() 的线程会被阻塞,计数器变为 0 就会被唤醒

二、CyclicBarrier

2.1 特性

让线程达到某一时刻才开始做事

2.2 实例

2.2.1 需求

同 1.2.1 需求

2.2.2 问题

同 1.2.2 问题

2.2.3 解决

使用 CyclicBarrier 来解决:

class Test {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {
            System.out.println(Thread.currentThread().getName() + ": every thread is over!");
        });
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " is running...");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, "T-" + i).start();
        }
    }
}

输出:

T-1 is running...
T-2 is running...
T-3 is running...
T-4 is running...
T-5 is running...
T-6 is running...
T-7 is running...
T-8 is running...
T-9 is running...
T-10 is running...
T-10: every thread is over!

2.2.4 注意

CyclicBarrier 是基于加法,初始化一个计数值,在一定数量的线程执行后才执行指定的代码

并且指定的代码是由符合条件计数条件时执行的,跟执行它的那个线程没有关系,例如一千个线程,可能在这里的 Demo 执行到输出 "every thread is over" 的,会是 837 号线程

1.3 总结

CyclicBarrier 常用的东西:

  • 两个构造参数的构造器,指定一个 int parties ,Runnable action;

    • parties:线程执行的数量
    • action:在数量达到时执行的代码
  • await() 方法,这个在线程使用的时候,就表示阻塞一次线程构造器里的那段代码,阻塞到 parties 数量后就会通过其中一个线程取执行 action 代码

三、Semaphore

3.1 特性

信号量,可以指定多个线程同时访问某个资源

3.2 实例

3.2.1 需求

锁相当于一个互斥资源,假设争抢一个互斥资源可以做到,三个互斥资源的竞争,该如何做到呢?

3.2.2 问题

Java 提供的锁实现多个互斥量的同步

3.3.3 解决

虽然自己写一个类似的互斥资源类,但 Java 已经提供了 AQS 组件 Semaphore 来给我们使用

class Test {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+" get resource!");
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName()+" release resource!");
                    semaphore.release();
                }
            }, "T-" + i).start();
        }
    }
}

输出:

T-1 get resource!
T-2 get resource!
T-3 get resource!
T-1 release resource!
T-4 get resource!
T-2 release resource!
T-5 get resource!
T-3 release resource!
T-6 get resource!
T-4 release resource!
T-5 release resource!
T-6 release resource!

3.3.4 注意

semaphore 的 release() 操作要放在线程全部操作之后

3.3 总结

上面说的其他两种组件就好像是时刻性的同步组件,而 Semaphore 是属于资源类的信号量,针对的是互斥资源数量这个角度的

semaphore 有两个常用的东西:

  • acquire() 尝试取得一个互斥资源

  • release() 释放当前占用的互斥资源

  • 构造器常用的是初始化指定数量的互斥量:Semaphore(int permit) ,permit 指定了互斥资源数量

四、Condition

4.1 概念

Condition 与 Lock 配合完成等待 / 通知 机制,比起重量锁的对象监视器的等待 / 通知机制,Condition 可控制性和拓展性更强

4.2 性质

  • 超时操作
  • 中断
  • 多个等待队列

4.3 原理

4.3.1 newCondition()

Conditino 的创建与使用是与 Lock 相互绑定的:

  • Lock lock = new ReentrantLock();
  • Condition c = lock.newCondition();

这两步发生了什么?看看源码片段

创建可重复锁,默认是非公平的:

public ReentrantLock() {
    sync = new NonfairSync();
}

通过 sync 对象创建 Condition:

public Condition newCondition() {
    return sync.newCondition();
}

sync 创建的是一个 ConditionObject 对象:

final ConditionObject newCondition() {
    return new ConditionObject();
}

好吧,再往下走,会发现 ConditionObject 来自 AQS(AbstractQueuedSynchronizer):

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    private transient Node firstWaiter;
    ...
}

所以,这里取得的 c 其实是一个 ConditionObject 对象, ConditionObject 是 AQS 的一个内部类

回顾一下,在机制实现中, AQS 维护了一个同步队列:

  • 如果是独占锁,所有获取锁失败的线程的都会插入到同步队列的尾部

Condition 也是一样的方式,内部维护了一个等待队列,所有调用 condition.await() 的线程都会加入到等待队列中,并且线程状态转换为无限期等待状态,

Condition 通过持有等待队列的头尾指针来管理等待队列,Node 复用了 AQS 中的 Node 内部类,每个线程就会被封装到不同的 Node 节点中

Condition 维护的等待队列是单向队列,AQS 维护的是双向队列

lock.newCondition() 实质上,就是创建单向队列的过程,所以,一个 lock 可以有多个等待队列

4.3.2 await()

上面知道了是 lock 生出了 condition,那么condition.await() 一旦调用,就会把当前获取到锁的线程加入到等待队列

4.3.3 signal() / signalAll

当调用了 signal() / signalAll 会使得线程从等待队列移入到同步队列中,直到获取到 lock 后才会从 await() 方法中返回逃出来!或者在等待时被中断,也可以做中断处理

移入到同步队列的线程,就是唤醒的节点,将会通过自旋方式来进行锁竞争