工程经验 - 周期任务调度设计与思考

479

一、背景

目前业务工程的任务调度的设计是之前一个老哥负责的,由于留下不少坑,导致生产环境出了个非常严重的问题,影响很大

解决完生产环境的问题也开始思考如果让我自己实现大概会怎么做?

之前的任务调度组件无法很好地解决一个问题:周期性调度

周期性调度任务可以细分:

  1. 某个时间点执行
  2. 某个时间段后执行

注意:本篇文章的想法都是针对「周期性调度」类型的任务讨论,同时我们的需求来源是上述第二种

二、最简单的想法

将任务委托给线程池,业务执行后响应 timer(一个 long 值,单位 ms)给线程,线程进入 sleep,sleep 的时间由 timer 决定

timmer sleep 期间,从 IDEA DEBUG 中显示,线程处于 SLEEP 状态,这种状态下会让出 CPU,但是线程却无法利用,如果线程池开 200 个线程,200 个线程处于 SLEEP 也是非常有可能的

此时会发现 CPU 处于闲置状态,但是线程池没办法跑任务,也就是极大地降低了 CPU 的吞吐量

三、提高线程池的线程利用率

提高线程池的线程池利用率,减少 SLEEP 状态的线程,经过一番调研发现了两种方案:

  • 时间轮算法
  • 延迟队列

时间轮算法粗略看了一下,是一个很古老的论文了,没有细看,做了延迟队列的简单 demo 实现,通过延迟队列,可以避免 SLEEP 状态的线程,这个思路上是比较好做的

  • 有个任务队列,它是优先队列,也是阻塞队列,可以告诉我们队头的就是接下来要执行的任务
  • 抽象任务作为一个 Runnable 实现,不断往队列里面丢任务
  • 跑一个线程监听任务队列,触发任务执行,即可实现周期性调度
  • 这个过程可以发现没有 SLEEP 占用线程池线程

首先简单抽象公共的代码到一个基类:

/**
 * 周期性调度任务基类
 *
 * @author Wayland Zhan[routerhex@qq.com]
 * @since 2021-12-25 上午10:03
 */
public abstract class DelayHandler implements Delayed, Runnable {

    /**
     * 执行周期,默认是 -1(没有下一个周期)
     */
    int timer = -1;

    /**
     * 开始时间
     */
    long start = System.currentTimeMillis();

    private DelayQueue<DelayHandler> delayQueue;

    public DelayHandler(DelayQueue<DelayHandler> delayQueue) {
        this.delayQueue = delayQueue;
    }

    /**
     * 下一次执行:过期时间 - 当前时间
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start + timer) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 内部排序:当前时间的延迟时间 - 比较对象的延迟时间
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    public abstract int execute();

    @Override
    public void run() {
        int time = execute();
        if (!Objects.equals(time, -1)) {
            this.start = System.currentTimeMillis();
            delayQueue.offer(this);
        }

    }

    public void setTimer(int timer) {
        this.timer = timer;
    }
}

接着,我们可以实现一个业务的周期性调度任务类:

/**
 * 业务周期性调度任务类
 *
 * @author Wayland Zhan[routerhex@qq.com]
 * @since 2021-12-25 上午10:12
 */
@Slf4j
public class MonitorHandler extends DelayHandler {

    public MonitorHandler(DelayQueue<DelayHandler> delayQueue) {
        super(delayQueue);
        timer = 0;
    }

    /**
     * 业务定义执行次数:执行三次,cnt 非原子操作,并发下需要使用原子引用
     */
    private final AtomicInteger cnt = new AtomicInteger(0);

    private final Object o = new Object();

    @Override
    public int execute() {
        synchronized (o) {
            log.warn("running....{}", cnt.incrementAndGet());

            // 执行三次
            if (Objects.equals(cnt.get(), 3)) {
                return -1;
            } else {
                return timer;
            }
        }
    }
}

在任务调度时,我们可以给任务调度基类设计一个线程池,延迟队列,就可以完成任务的周期性调度:

/**
 * 测试类
 *
 * @author Wayland Zhan[routerhex@qq.com]
 * @since 2021-12-25 上午10:16
 */
@Slf4j
class DelayJobTest {

    @RepeatedTest(50)
    void testScheduleTimer() throws InterruptedException {

        // 任务执行线程池
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(20,
                200,
                1,
                TimeUnit.MINUTES,
                new SynchronousQueue<>(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        // 延迟队列
        DelayQueue<DelayHandler> delayQueue = new DelayQueue<>();

        // 卡住主线程,等待任务执行完,再关闭主线程
        CountDownLatch latch = new CountDownLatch(3);

        // 延迟队列监听线程
        new Thread(() -> {
            while (true) {
                try {
                    DelayHandler handler = delayQueue.take();

                    // TODO: 2021/12/25 线程池满的问题无法解决,线程池数量达到上线,让线程池休眠
                  
                    while (executorService.getActiveCount() == 200) {
                        Thread.sleep(500);
                    }

                    executorService.execute(handler);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 将任务推入任务队列,触发任务启动
        MonitorHandler h = new MonitorHandler(delayQueue);
        delayQueue.offer(h);

        // 挂起主线程,等待线程池的任务执行完
        latch.await(60, TimeUnit.SECONDS);
        log.warn("OVER");
    }
}

从结果来看,可以顺利完成周期性调度(断言不大好写,只能人肉看日志了):

12-25 11:39:36.787 [pool-2-thread-1] WARN  c.y.w.c.job.MonitorHandler [execute] [40] - running....1
12-25 11:39:36.798 [pool-2-thread-2] WARN  c.y.w.c.job.MonitorHandler [execute] [40] - running....2
12-25 11:39:36.799 [pool-2-thread-3] WARN  c.y.w.c.job.MonitorHandler [execute] [40] - running....3
12-25 11:39:36.799 [main] WARN  c.y.w.c.job.DelayJobTest [testScheduleTimer] [67] - OVER

四、从业务根本问题思考

使用线程池和延迟队列做周期性调度的问题,在两个方面:

线程池临界点不好设计

  1. 延迟队列允许放多少个任务?延迟队列是无界的,线程池内的最大线程数是有限的,不说并行,并发数取决于线程池最大线程数,延迟队列要控制放多少个任务?无限放会导致 OOME,放太少无法利用好线程池的线程
  2. 线程池任务执行时间不可估量。线程池并发线程已满,此时监听线程取出了一个需要执行的任务,要怎么办?重新推入队列?让监听线程 SLEEP 500ms 之类的?

业务任务 execute() 需要注意处理并发问题

此外还有一个问题就是从第三节的示例代码中可以看出,如果采用 SLEEP,业务的任务代码其实是单线程执行,没有并发问题,如果采用延迟队列执行,就会产生并发问题,所以需要控制业务代码执行是原子的,可能得在方法加上重量锁之类的

跟 leader 同步我的方案后,leader 觉得方案的实现没有问题,不过还是无法彻底解决根本问题,临界点问题也不好设计。因为周期性调度在我们的业务工程中是作为轮询接口的方案,很明显轮询是可以做成发布-订阅的

如果做成发布-订阅,就可以避免我们需要自行设计周期性调度问题了

五、后记

本次调研过程,粗略探索了一下网红 job 调度框架 xxl-job,从 com.xxl.job.admin.core.thread.JobScheduleHelper 中可以看到,该框架爱是基于时间轮实现的任务调度,不过线程池调度本身也没有做太多智能化参数的控制,轮询线程还是会有 500ms 的 SLEEP

想要完美地利用到 CPU 和解决线程临界点问题是非常困难的,目前能看到最好的优化就是改造为发布-订阅式