Java 并发 - 线程池

306

一、概念

线程池就是一种可以线程复用、控制最大并发数、管理线程的结构

基本处理是将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,那么超出数量的线程排队等候,等待其他线程执行完毕,再从队列中取出线程任务继续执行

二、作用

  • 降低资源消耗,通过重复利用已创建的线程来降低线程创建和销毁造成的消耗
  • 提高响应速度,任务到达时,可以不用等待线程创建就立即执行
  • 提高线程管理性,线程池可以对线程统一分配,调优和监控

三、体系

可以粗略地作以下继承层次对应:

  • Executor 接口理解为 Collection 接口一个级别
  • ExecutorService 接口就对应 List 接口
  • Executors 工具类对应于 Collections 工具类

四、使用

4.1 种类

线程池算到 Java 8 已经有五种了,但常用是三种实现方式:

  • Executors.newFixedThreadPool( int ),固定线程数量的池,适合:长期执行的任务
  • Executors.newSingleThreadExecutor(),单个线程的池,适合:只需一个线程只需的任务
  • Executors.newCachedThreadPool(),一个任务创建一个线程的池,适合:短期异步执行的任务

4.2 FixedThreadPool

线程复用测试 Demo:

class Main{
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" is working");
                });
            }
        }finally {
            threadPool.shutdown();
        }
    }
}

结果:

pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-3 is working
pool-1-thread-2 is working
pool-1-thread-4 is working
pool-1-thread-5 is working

4.3 SingleThreadExecutor

测试单个线程执行任务 Demo:

class Main{
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" is working");
                });
            }
        }finally {
            threadPool.shutdown();
        }
    }
}

结果:

pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working
pool-1-thread-1 is working

4.4 CachedThreadPool

测试一个任务对应一个线程 Demo

class Main{
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        ExecutorService threadPool = Executors.newCachedThreadPool();
        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" is working");
                });
            }
        }finally {
            threadPool.shutdown();
        }
    }
}

结果:

pool-1-thread-2 is working
pool-1-thread-1 is working
pool-1-thread-3 is working
pool-1-thread-4 is working
pool-1-thread-5 is working
pool-1-thread-6 is working
pool-1-thread-7 is working
pool-1-thread-9 is working
pool-1-thread-10 is working
pool-1-thread-8 is working

五、原理

5.1 构造

三种常用线程池:

  • CachedThreadPool
  • FixedThreadPool
  • SingleThreadExecutor

都是由 ThreadPoolExecutor 产生的,不同点就是三种 ThreadPoolExecutor 的构造器参数不同,使得产生了三种不同线程池

ThreadPoolExecutor 构造器有五个参数,这就是所谓的线程池的核心参数

5.2 核心参数-简介

线程池的参数有 7 个,其中核心参数是 5 个,源码片段如下:

ThreadExecutor:(第一层,五个核心参数)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

上面代码片段的 this 源码如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

所以参数总共有 7 个:

  1. corePoolSize:线程池的常驻核心线程数
  2. maximumPoolSize:线程池能够容纳同时执行的最大线程数
  3. keepAliveTime:非核心线程空闲存活时机,到达该设置的值,多余线程会被销毁,直到剩下 corePoolSize 数量的线程
  4. unit:keepAliveTime 的单位
  5. workQueue:任务队列,存储被提交单尚未被执行的任务
  6. threadFactory:线程池中工作线程的线程工厂,一般默认
  7. handle:拒绝策略。表示当线程队列满了,且工作线程大于等于 maximumPoolSize 时如何拒绝

5.3 CacheThreadPool

源码片段:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

5.4 FixedThreadPool

源码片段:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

5.5 SingleThreadExecutor

源码片段:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

六、核心参数

1. corePoolSize

创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务

概念:线程池中的数目达到 corePoolSize 后,就会把到达的任务放到缓存队列中

所以,这个参数就理解为初始化的线程数即可,就是目前的上限

2. maximumPoolSize

概念:线程池能够容纳同时执行的最大线程数,此值必须大于等于 1

一般 corePoolSize < maximumPoolSize。当任务不断来,corePoolSize 数目的线程搞不定这些任务了,并且任务队列也已经满了,放不下了,就继续开新的线程,直到达到 maximumPoolSize 就不能继续开了

所以,这个参数就理解为线程池的线程数的最大上限即可,就是可以允许的最大上限

3. keepAliveTime

概念:多余空闲线程的存活时间

当线程池数量超过 corePoolSize 时,且当空闲时间到达 keepAliveTime 值时,多余空闲线程回被销毁直到只剩下 corePoolSize 个线程为止

这些销毁的线程就是任务持续进来,然后目前线程池的线程数大于 corePoolSize 新开的那些线程

所以,这个理解为过了一段时间线程一直在闲着,就把空闲的不处理任务的线程销毁

4. unit

概念:设置 keepAliveTime 的单位

5. workQueue

概念:任务队列,实现原理是阻塞队列,存放被提交但尚未被执行的任务,线程池中的数目达到 corePoolSize 之后,就会把到达的任务放到这个阻塞队列中

所以,这个理解为当前线程已经达到最大能执行的数目,搞不定新任务了就把这个任务放到一个阻塞队列

6. threadFactory

生成线程池中工作线程的线程工厂,用于创建心线程一般用默认的即可

所以,这个就理解为创建线程的方式,一般不需要深入了解

7. handler

概念:拒绝策略,表示当任务队列满了并且工作线程大于线程池的最大线程数(maximumPoolSize)时执行的拒绝策略

所以拒绝策略执行的时候有两个条件:

  • 任务队列满了
  • 线程目前可允许的最 大线程数也满了

所以,这个就理解为线程池爆满的后备方案

七、实例解析

线程池处理任务的流程是这样的:

  • 加入任务,让线程处理
  • 任务变多,大于 corePoolSize,处理不过来,就放入任务队列
  • 任务还是不断进来,任务队列也满了,就开新的线程处理任务,但线程总数会小于或等于 maximumPoolSize
  • 一旦任务队列满了,同时线程池中的线程数也已经大于 maximumPoolSize 了,分两种情况
  • 情况一:假设现在任务量慢慢减少了,那么会一步步地将任务队列的任务取出来处理,最后逐渐销毁大于 corePoolSize 时的新开的线程,然后直到只剩下 corePoolSize 数量的线程,这是常存的线程数
  • 情况二:假设现在任务量还是不断增加,maximumPoolSize 已经达到,任务队列已经满了,那么就会执行拒绝策略
  • 拒绝策略有四种,默认是 AbortPolicy

八、拒绝策略

四种拒绝策略均实现了 RejectedExecutionHandler 接口

8.1 AbortPolicy

直接抛出 RejectedExecutionException 异常阻止系统正常运行

8.2 CallerRunsPolicy

”调用者运行“ 一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量

8.3 DiscardOldestPolicy

抛弃任务队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

8.4 DiscardPolicy

直接丢弃任务,不给予任务处理也不抛异常。如果允许任务丢失,这是最好的一种方法

九、实际使用

9.1 说明

实际上,JDK 实现的常见三种线程池(单一、固定数、可变)都不会应用于工业生产,而是使用自己改造的线程池

也就是说不是使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,写出自己需求的线程池,避免资源耗尽的风险

9.2 缺陷分析

缺陷分析来源于阿里巴巴开发手册

  • 缺陷一:因为任务队列用的 LinkedBlockingQueue,无界阻塞队列,所以它允许的任务队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOME,代表两种线程池如下
    • FixedThreadPool(c:n,m:n)
    • SingleThreadPool(c:1,m:1)
  • 缺陷二:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOME,代表两种线程池如下
    • CachedThreadPool(c:0,m:Integer.MAX_VALUE)
    • ScheduledThreadPool(c:0,m:Integer.MAX_VALUE)

9.3 自定义线程池

不再使用:

ExecutorService threadPool = Executors.new...();

而是通过:

ExecutorService threadPool = new ThreadPoolExecutor();

具体举例如下:

  • corePoolSize:2
  • maximumPoolSize:5
  • keepAliveTime:1L(long 型的一秒)
  • unit:TimeUnit.SECONDS
  • workQueue:new LinkedBlockingQueue<Runnable> (3)
  • threadFactory:Executors.defaultThreadFactory() (默认值)
  • new ThreadPoolExecutor.AbortPolicy()
ExecutorService threadPool = new ThreadPoolExecutor(
    						 2, 
    						 5,
    						 1L, 
                             TimeUnit.SECONDS, 
                        	 new LinkedBlockingQueue<Runnable(3),
                             Executors.defaultThreadFactory(),
							 new ThreadPoolExecutor.AbortPolicy()
);

在这个自定义的线程池中,最大线程数为 5 + 3 ,一旦有 8 个,那么就会抛异常 RejectedExecutionException

Demo 代码如下:

public class Main {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()

        );

        try {
            for (int i = 1; i <= 9; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " working ");
                });
            }
        } finally {
            threadPool.shutdown();
        }
    }
}

注意:CallerRunsPolicy 就是哪个线程来了,就回退给哪个线程