Java 并发 - 阻塞队列

275

一、概念

具有以下性质的队列:

  • 阻塞队列空,从队列取元素操作会被阻塞,直到有新元素加入
  • 阻塞队列满,从队列加元素操作会被阻塞,直到有旧元素加入

二、作用

阻塞队列的作用也就是其好处:自动管理了阻塞 / 唤醒线程的时机

在这东西出来前,程序员需要自动管理,通过 wait / notify 这样的机制,写起来麻烦

三、BlockingQueue

3.1 体系

继承 Collection 接口,具有七个实现类,常用的三个:

  • ArrayBlockingQueue:数组构成的有界阻塞队列
  • LinkedBlockingQueue:链表构成的有界(Integer.MAX_VALUE)阻塞队列
  • SynchronousQueue:只存单个元素的阻塞队列

3.2 API

插入、删除对应四种形式的 API:

  • 抛出异常
  • 特殊值
  • 阻塞
  • 超时

检查对应两种形式的 API:

  • 抛出异常
  • 特殊值
抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove(e)poll()take()poll(time, unit)
检查element()peek()
  • 抛出异常
    • 插入、移除、检查失败则抛出异常
  • 特殊值
    • 插入,返回成功 true,失败 false
    • 移除,成功返回队列移除元素
  • 阻塞
    • 插入、移除检查失败则阻塞

四、应用

4.1 生产者 - 消费者

4.1.1 实现方式

  • 基于 synchronized / wait / notifyAll :重量锁与 Object 自带方法
  • 基于 ReentrantLock / await / signalAll:可重入锁与 Condition 方法
  • 基于 ArrayBlockingQueue / put / take:阻塞队列及自带方法

4.2 ReentrantLock / Condition

多生产者与多消费者 Condition 版本:

class Product {
    private int num;
    private int capacity = 1;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws InterruptedException {
        lock.lock();
        try {
            while (num == capacity) {
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName() +
                               " create, and product remain:\t" +
                               num);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (num == 0) {
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName() +
                               " cost, and product remain:\t"   +
                               num);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

class Test {
    public static void main(String[] args) {
        Product product = new Product();
        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                try {
                    product.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "T" + i).start();
        }
        for (int i = 6; i <= 10; i++) {
            new Thread(() -> {
                try {
                    product.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "T" + i).start();
        }
    }
}

生产者消费者阻塞队列版:

class Producer extends Thread {
    private BlockingQueue blockingQueue;

    public Producer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            this.blockingQueue.put("product");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + " is producing...");
        }

    }
}

class Consumer extends Thread {
    private BlockingQueue blockingQueue;

    public Consumer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            this.blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + " is consuming...");
        }

    }
}

class Test {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
        for (int i = 1; i <= 3; i++) {
            new Producer(blockingQueue).start();
        }
        for (int i = 1; i <= 3; i++) {
            new Consumer(blockingQueue).start();
        }
    }
}
  • 注意:线程取值和读值具有无序性,所以可能发生消费在生产之前,但底层绝不会存在资源不够,但还能被消费者消费的现象,若资源不够 / 资源满,就会阻塞,这是肯定的

4.3 线程池

内部的任务队列,其实就是使用了 LinkedBlockingQueue 等阻塞队列实现的

4.4 消息队列

消息队列是基于 生产者-消费者 为核心的架构设计的,而一般可以使用阻塞队列来实现这一模式