Java 并发 - 阻塞队列
一、概念
具有以下性质的队列:
- 阻塞队列空,从队列取元素操作会被阻塞,直到有新元素加入
- 阻塞队列满,从队列加元素操作会被阻塞,直到有旧元素加入
二、作用
阻塞队列的作用也就是其好处:自动管理了阻塞 / 唤醒线程的时机
在这东西出来前,程序员需要自动管理,通过 wait / notify 这样的机制,写起来麻烦
三、BlockingQueue
3.1 体系
继承 Collection 接口,具有七个实现类,常用的三个:
- ArrayBlockingQueue:数组构成的有界阻塞队列
- LinkedBlockingQueue:链表构成的有界(Integer.MAX_VALUE)阻塞队列
- SynchronousQueue:只存单个元素的阻塞队列
3.2 API
插入、删除对应四种形式的 API:
- 抛出异常
- 特殊值
- 阻塞
- 超时
检查对应两种形式的 API:
- 抛出异常
- 特殊值
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove(e) | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 无 | 无 |
- 抛出异常
- 插入、移除、检查失败则抛出异常
- 特殊值
- 插入,返回成功 true,失败 false
- 移除,成功返回队列移除元素
- 阻塞
- 插入、移除检查失败则阻塞
四、应用
4.1 生产者 - 消费者
4.1.1 实现方式
- 基于 synchronized / wait / notifyAll :重量锁与 Object 自带方法
- 基于 ReentrantLock / await / signalAll:可重入锁与 Condition 方法
- 基于 ArrayBlockingQueue / put / take:阻塞队列及自带方法
4.2 ReentrantLock / Condition
多生产者与多消费者 Condition 版本:
class Product {
private int num;
private int capacity = 1;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
while (num == capacity) {
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() +
" create, and product remain:\t" +
num);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
while (num == 0) {
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() +
" cost, and product remain:\t" +
num);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
class Test {
public static void main(String[] args) {
Product product = new Product();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
product.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T" + i).start();
}
for (int i = 6; i <= 10; i++) {
new Thread(() -> {
try {
product.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T" + i).start();
}
}
}
生产者消费者阻塞队列版:
class Producer extends Thread {
private BlockingQueue blockingQueue;
public Producer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
this.blockingQueue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " is producing...");
}
}
}
class Consumer extends Thread {
private BlockingQueue blockingQueue;
public Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
this.blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " is consuming...");
}
}
}
class Test {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
for (int i = 1; i <= 3; i++) {
new Producer(blockingQueue).start();
}
for (int i = 1; i <= 3; i++) {
new Consumer(blockingQueue).start();
}
}
}
- 注意:线程取值和读值具有无序性,所以可能发生消费在生产之前,但底层绝不会存在资源不够,但还能被消费者消费的现象,若资源不够 / 资源满,就会阻塞,这是肯定的
4.3 线程池
内部的任务队列,其实就是使用了 LinkedBlockingQueue
等阻塞队列实现的
4.4 消息队列
消息队列是基于 生产者-消费者
为核心的架构设计的,而一般可以使用阻塞队列来实现这一模式
0