消息队列 - RabbitMQ 集成 SpringBoot (Topic)

262

从零开始一个 Topic 模式的 Demo,做了以下的事情:

  • 定义两个主题:
    • 中国.天气
    • 中国.*
  • 定义生产者通过三个 Routing-Key 发送三条消息
    • 中国.天气
    • 中国.新闻
    • 中国.艺术
  • 定义两个队列,绑定两个主题
    • 中国.天气
    • 中国.*
  • 定义两个消费者,监听消费以上定义的两个队列
  • 查看结果,正确的话,是绑定 “中国.天气” 的消费者消费了一条消息,绑定 “中国.*” 的消费者消费了三条消息!

一、依赖配置

还是老样子,和上边依赖,导入三个 artifactid 为以下的依赖:

  • spring-boot-starter-amqp
  • hutool-all
  • spring-boot-starter-test
  • lombok

配置一下基本 RabbitMQ 服务基本信息

spring:
  rabbitmq:
    port: 5672
    username: admin
    password: admin
    host: localhost

二、常量类

定义一些基本的常量信息,消息队列的操作中,常涉及比较多的组件的时候,用常量信息比较好区分,这里就是定义了基本的一些涉及组件,例如交换器、队列、主题的名称都是什么:

public class Constants {
    /**
     * 队列一的名字
     */
    public static final String TOPIC_QUEUE_ONE = "topic_queue_one";

    /**
     * 队列二的名字
     */
    public static final String TOPIC_QUEUE_TWO = "topic_queue_two";

    /**
     * 交换器的名字
     */
    public static final String TOPIC_EXCHANGE = "topic_exchange";

    /**
     * 话题一
     */
    public static final String TOPIC_ONE = "中国.天气";

    /**
     * 话题二
     */
    public static final String TOPIC_TWO = "中国.*";

}

三、配置类

依次创建我们想要的信息:

  • 两个队列
  • 一个交换器
  • 绑定一个队列到主题一
  • 绑定一个队列到主题二

一般,在组件配置类和绑定配置类要抽离出来

组件配置 : 创建队列、交换器

@Configuration
public class MqComponentConfig {

    /**
     * autoDelete() 是配置消息通信后自动删除,否则会一直用同一个 queue,
     * 后面的 topic 一直累加绑定上去!!!
     *
     * @return
     */
    @Bean(autowire = Autowire.BY_NAME, value = "queueOne")
    public Queue queueOne() {
        return QueueBuilder.durable(TOPIC_QUEUE_ONE).autoDelete().build();
    }

    @Bean(autowire = Autowire.BY_NAME, value = "queueTwo")
    public Queue queueTwo() {
        return QueueBuilder.durable(TOPIC_QUEUE_TWO).autoDelete().build();
    }

    @Bean(autowire = Autowire.BY_NAME, value = "topicExchangeOne")
    public TopicExchange topicExchangeOne() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

}

绑定配置:创建绑定关系

@Configuration
public class MqBindingConfig {

    @Resource(name = "queueOne")
    private Queue queueOne;

    @Resource(name = "queueTwo")
    private Queue queueTwo;

    @Resource(name = "topicExchangeOne")
    private TopicExchange topicExchangeOne;

    /**
     * 将创建的队列一绑定到主题一
     *
     * @return
     */
    @Bean
    public Binding bindingExchangeOne() {
        return BindingBuilder.bind(queueOne).to(topicExchangeOne).with(TOPIC_ONE);
    }

    /**
     * 将创建的队列二绑定到主题二
     *
     * @return
     */
    @Bean
    public Binding bindingExchangeTwo() {
        return BindingBuilder.bind(queueTwo).to(topicExchangeOne).with(TOPIC_TWO);
    }

}

但其实可以写到一个配置类里面,如下,因为 @Bean 没有用 value 指定 bean 的名字,注入进入就是方法名,所以在绑定关系中:

  • 绑定关系一的队列参数名和注入的 bean 的方法要一致!也就是 queueOne
  • 绑定关系二同理
  • 而 exchange 只有一个却不用匹配,是因为只注入一个,所以默认选它了,如果有两个就要指定一下了!
@Configuration
public class MqConfig {

    @Bean
    public Queue queueOne() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE_ONE).autoDelete().build();
    }

    @Bean
    public Queue queueTwo() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE_TWO).autoDelete().build();
    }

    @Bean
    public TopicExchange topicExchangeOne() {
        return new TopicExchange(Constants.TOPIC_EXCHANGE);
    }

    @Bean
    public Binding bindingExchangeOne(Queue queueOne, TopicExchange exchange) {
        return BindingBuilder.bind(queueOne).to(exchange).with(Constants.TOPIC_ONE);
    }

    @Bean
    public Binding bindingExchangeTwo(Queue queueTwo, TopicExchange exchange) {
        return BindingBuilder.bind(queueTwo).to(exchange).with(Constants.TOPIC_TWO);
    }

}

四、生产者

由于路由键目前只与这一个生产值有关,所以路由键常量就定义在这里了

@Component
public class Producer {

    public static final String ROUTING_KEY_ONE = "中国.天气";

    public static final String ROUTING_KEY_TWO = "中国.新闻";

    public static final String ROUTING_KEY_THREE = "中国.艺术";

    private static final Log LOGGER = LogFactory.get();

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void produceStrMessage(String exchangeName, String routingKey, String msgId) {

        String msg = "Hi, it's message " + msgId;
        LOGGER.warn("springboot-topic | producer : " + msg);
        this.rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);

    }
    
}

五、消费者

定义一个消费者接口(这里闲着蛋疼定义的,不定义也可以)

public interface Consumer {
    void consumeStrMessage(String msg);
}

定义绑定第一个主题(中国.天气)的队列的消费者

@Component
@RabbitListener(queues = {Constants.TOPIC_QUEUE_ONE})
public class ConsumerTopicOne implements Consumer {

    private static final Log LOGGER = LogFactory.get();

    @RabbitHandler
    @Override
    public void consumeStrMessage(String msg) {
        LOGGER.warn("ConsumerTopicOne | consume msg : " + msg);
    }

}

定义第二个主题(中国.*)的队列的消费者

@Component
@RabbitListener(queues = Constants.TOPIC_QUEUE_TWO)
public class ConsumerTopicTwo implements Consumer {

    private static final Log LOGGER = LogFactory.get();

    @RabbitHandler
    @Override
    public void consumeStrMessage(String msg) {
        LOGGER.warn("ConsumerTopicTwo | consume msg : " + msg);
    }

}

六、测试

定义一个基于 springboot 的,生产者的测试类,对其 produce() 方法进行测试,分别发送消息

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private Producer producer;

    @Test
    public void produceStrMessage() {
        producer.produceStrMessage(
                TOPIC_EXCHANGE,
                Producer.ROUTING_KEY_ONE,
                "message-one, my routing-key is --- " + Producer.ROUTING_KEY_ONE);

        producer.produceStrMessage(
                TOPIC_EXCHANGE,
                Producer.ROUTING_KEY_TWO,
                "message-two, my routing-key is --- " + Producer.ROUTING_KEY_TWO);

        producer.produceStrMessage(
                TOPIC_EXCHANGE,
                Producer.ROUTING_KEY_THREE,
                "message-third, my routing-key is --- " + Producer.ROUTING_KEY_THREE);
    }
    
}

输出结果

可以看到

  • 生产者生成了三条消息
  • 绑定队列一("中国.天气")消费者消费了一条消息
  • 绑定队列二("中国.*")的消费者消费了三条消息

而且,如果将绑定配置里的 bindingExchangeOne() ,将队列一改为队列二,如下:

    @Bean
    public Binding bindingExchangeOne() {
        return BindingBuilder.bind(queueTwo).to(topicExchangeOne).with(TOPIC_ONE);
    }

那么,现在就队列一没绑定主题,队列二绑定了两个主题,如果发送,监听队列一的消费者肯定接收不到消息,测试:

可以看到的确如此,消费者一监听队列一,没有消费消息