工程经验 - 异步计算削峰方案 & 实战

一、背景

我们的产品曾服务于科研行业厂商,为其提供 AI 训练和数据计算功能。其中一个重要功能是【指标计算】:

  • 用户提交指标文件
  • 后端读取文件并进行分数计算
  • 用户可以在提交列表中查看提交记录,其中包括计算指标和各项分数

二、问题

根据需求,我们使用算法工程师提供的 SDK 对计算性能进行边界测试。然而,我们发现计算时间非常长,可能需要几分钟,甚至几小时的时间。从工程角度来看,此需求需要进行异步化处理。因此,我们的负责计算服务的同事提供了一个异步的 gRPC 接口,计算结果会以 MQ 的方式发布。

后来,我们成功地完成了这个功能,并对其进行了性能边界测试。然而,我们发现计算非常耗费资源。在进行 1 个 50 万行数的文件或者 5 个 10 万行数的文件计算时,就会导致内存溢出,计算失败。

通常,用户提交的文件规模在 10 万行左右。由于生产环境的机器配置大约是研发环境的 4 倍,因此意味着如果有 20 个用户同时提交计算请求,服务就会崩溃。

三、初版方案 - 客户端内存队列

为了解决并发计算导致内存溢出问题,我们决定在客户端(也就是业务服务)这一侧,临时采用一个内存队列方案,其中所有的内存队列均为 BlockingQueue。

我们的整体方案如下:

3320-limit-approach1

业务服务:

  • 业务接口接收请求参数,并将其放入缓冲队列中。如果缓冲队列已满,则返回超时错误。缓冲队列最多存储 5000 个请求
  • 建立一个工作线程,从缓冲队列中取出参数,并将其放入等待队列中。等待队列最多存储 5 个请求
  • 工作线程每成功放入一个请求到等待队列中,就会调用一次分数计算服务进行计算
  • 监听 MQ 并订阅计算结果

分数指标计算服务:

  • 新建进程进行分数计算,并在计算完成后将结果发布到 MQ

该方案存在以下缺点:

  • 复杂度高:手动实现队列进行限流,增加了代码的复杂度和维护难度
  • 数据丢失:服务重启时,内存队列中的数据会丢失,无法继续处理

四、优化方案 - MQ 队列

3320-limit-approach2

业务服务

业务接口触发指标提交时,使用发布者向参数队列发布一个参数

/**
* 定义参数队列
*/
@Configuration
public class AssessmentMQConfig {

    /**
     * 参数队列,第二个参数 true 表示开启持久化
     */
    @Bean
    public Queue paramQueue() {
        return new Queue(PARAM_QUEUE, true);
    }

    /**
     * 结果队列,第二个参数 true 表示开启持久化
     */
    @Bean
    public Queue resultQueue() {
        return new Queue(RESULT_QUEUE, true);
    }

}
@Component
@RequiredArgsConstructor
public class ParamPublisher {
    private final AmqpTemplate rabbitTemplate;

    public void publish(String s) {
        rabbitTemplate.convertAndSend(PARAM_QUEUE, s);
    }

}

业务服务监听结果队列,取到参数之后更新数据库,从而让用户查询获取到结果。ACK 注意 - 这里是更新完数据库了,再 ACK 计算结果

@Slf4j
@Component
@RequiredArgsConstructor
public class ResultSubscriber {
  
    @RabbitListener(queues = AssessmentMQConfig.ASSESSMENT_RESULT_QUEUE)
    public void processData(String result, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("从队列订阅到消息:[{}]", result);
        try {
          // update message in db
        } catch (Exception e) {
            // 捕捉异常,防止消息无法被 ack
            log.error(e.getMessage(), e);
        }

        // ack 消息
        channel.basicAck(deliveryTag, true);
    }
}

分数指标计算服务

定义参数监听逻辑,ACK 注意 - 这里是计算完成,结果发布到结果队列了,再去 ACK 参数

@Slf4j
public class MetricsService {

    public void listenAndCalculate() {
        Channel channel = MqManager.getChannel();
        try {
            channel.basicConsume(
                    PARAM_QUEUE,
              			// auto ack false
                    false,
              			// 注册处理逻辑
                    new ParamSubscriber(channel)
            );
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

}

定义结果发布逻辑,可以看到无论是参数队列或者结果队列最后对会绑定到 Channel ,Channel 会和一个 Connection 绑定,对 Channel 进行了关键参数 basicQos 的配置,使得从队列获取消息时,消费完了 basicQos 数量的参数,才会取下一批

public class MqManager {
    private static Channel channel;

    private static void setupChannel() throws IOException {
        // 关键参数 - 声明队列的监听一次性最多监听 5 个
        channel.basicQos(5);

        // 声明结果队列的交换机、队列、交换机绑定
        channel.exchangeDeclare(RESULT_FANOUT, BuiltinExchangeType.FANOUT, true);
      	// 第二个参数为 true,表示此队列持久化
        channel.queueDeclare(RESULT_QUEUE, true, false, false, null);
        channel.queueBind(RESULT_QUEUE, RESULT_FANOUT, ROUTE_KEY);
    }

    public static Channel getChannel() {
        return channel;
    }

    public static void resultPublish(String message) throws IOException {
        if (channel == null) {
            initConnection();
        }
      
        channel.basicPublish(RESULT_FANOUT, ROUTE_KEY, null, message.getBytes(StandardCharsets.UTF_8));
    }
}

该方案采用 MQ 和手动 ACK,实现了参数和结果的解耦和持久化,从而保证了业务处理数据的一致性。

  • 如果业务服务重启,业务接口的参数在队列中不会丢失,而且在处理到一半时如果没有 ACK,重启后仍然可以监听到。这保证了参数的可靠性和一致性
  • 如果 MQ 重启,由于参数和结果都已经持久化,启动后可以再次被处理
  • 如果计算服务重启,参数可以重新从队列中监听到,重新开始计算。同时,已经计算成功的结果会被发布到结果队列中,这不会影响后续处理

综上,此方案可以有效地解决业务处理数据的一致性问题,并提高了系统的可靠性