消息队列 - RabbitMQ 集成 SpringBoot (Topic)
从零开始一个 Topic 模式的 Demo,做了以下的事情:
- 定义两个主题:
- 中国.天气
- 中国.*
- 定义生产者通过三个 Routing-Key 发送三条消息
- 中国.天气
- 中国.新闻
- 中国.艺术
- 定义两个队列,绑定两个主题
- 中国.天气
- 中国.*
- 定义两个消费者,监听消费以上定义的两个队列
- 查看结果,正确的话,是绑定 “中国.天气” 的消费者消费了一条消息,绑定 “中国.*” 的消费者消费了三条消息!
一、依赖配置
还是老样子,和上边依赖,导入三个 artifactid 为以下的依赖:
- spring-boot-starter-amqp
- hutool-all
- spring-boot-starter-test
- lombok
配置一下基本 RabbitMQ 服务基本信息
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: admin
    host: localhost
二、常量类
定义一些基本的常量信息,消息队列的操作中,常涉及比较多的组件的时候,用常量信息比较好区分,这里就是定义了基本的一些涉及组件,例如交换器、队列、主题的名称都是什么:
public class Constants {
    /**
     * 队列一的名字
     */
    public static final String TOPIC_QUEUE_ONE = "topic_queue_one";
    /**
     * 队列二的名字
     */
    public static final String TOPIC_QUEUE_TWO = "topic_queue_two";
    /**
     * 交换器的名字
     */
    public static final String TOPIC_EXCHANGE = "topic_exchange";
    /**
     * 话题一
     */
    public static final String TOPIC_ONE = "中国.天气";
    /**
     * 话题二
     */
    public static final String TOPIC_TWO = "中国.*";
}
三、配置类
依次创建我们想要的信息:
- 两个队列
- 一个交换器
- 绑定一个队列到主题一
- 绑定一个队列到主题二
一般,在组件配置类和绑定配置类要抽离出来
组件配置 : 创建队列、交换器
@Configuration
public class MqComponentConfig {
    /**
     * autoDelete() 是配置消息通信后自动删除,否则会一直用同一个 queue,
     * 后面的 topic 一直累加绑定上去!!!
     *
     * @return
     */
    @Bean(autowire = Autowire.BY_NAME, value = "queueOne")
    public Queue queueOne() {
        return QueueBuilder.durable(TOPIC_QUEUE_ONE).autoDelete().build();
    }
    @Bean(autowire = Autowire.BY_NAME, value = "queueTwo")
    public Queue queueTwo() {
        return QueueBuilder.durable(TOPIC_QUEUE_TWO).autoDelete().build();
    }
    @Bean(autowire = Autowire.BY_NAME, value = "topicExchangeOne")
    public TopicExchange topicExchangeOne() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
}
绑定配置:创建绑定关系
@Configuration
public class MqBindingConfig {
    @Resource(name = "queueOne")
    private Queue queueOne;
    @Resource(name = "queueTwo")
    private Queue queueTwo;
    @Resource(name = "topicExchangeOne")
    private TopicExchange topicExchangeOne;
    /**
     * 将创建的队列一绑定到主题一
     *
     * @return
     */
    @Bean
    public Binding bindingExchangeOne() {
        return BindingBuilder.bind(queueOne).to(topicExchangeOne).with(TOPIC_ONE);
    }
    /**
     * 将创建的队列二绑定到主题二
     *
     * @return
     */
    @Bean
    public Binding bindingExchangeTwo() {
        return BindingBuilder.bind(queueTwo).to(topicExchangeOne).with(TOPIC_TWO);
    }
}
但其实可以写到一个配置类里面,如下,因为 @Bean 没有用 value 指定 bean 的名字,注入进入就是方法名,所以在绑定关系中:
- 绑定关系一的队列参数名和注入的 bean 的方法要一致!也就是 queueOne
- 绑定关系二同理
- 而 exchange 只有一个却不用匹配,是因为只注入一个,所以默认选它了,如果有两个就要指定一下了!
@Configuration
public class MqConfig {
    @Bean
    public Queue queueOne() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE_ONE).autoDelete().build();
    }
    @Bean
    public Queue queueTwo() {
        return QueueBuilder.durable(Constants.TOPIC_QUEUE_TWO).autoDelete().build();
    }
    @Bean
    public TopicExchange topicExchangeOne() {
        return new TopicExchange(Constants.TOPIC_EXCHANGE);
    }
    @Bean
    public Binding bindingExchangeOne(Queue queueOne, TopicExchange exchange) {
        return BindingBuilder.bind(queueOne).to(exchange).with(Constants.TOPIC_ONE);
    }
    @Bean
    public Binding bindingExchangeTwo(Queue queueTwo, TopicExchange exchange) {
        return BindingBuilder.bind(queueTwo).to(exchange).with(Constants.TOPIC_TWO);
    }
}
四、生产者
由于路由键目前只与这一个生产值有关,所以路由键常量就定义在这里了
@Component
public class Producer {
    public static final String ROUTING_KEY_ONE = "中国.天气";
    public static final String ROUTING_KEY_TWO = "中国.新闻";
    public static final String ROUTING_KEY_THREE = "中国.艺术";
    private static final Log LOGGER = LogFactory.get();
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void produceStrMessage(String exchangeName, String routingKey, String msgId) {
        String msg = "Hi, it's message " + msgId;
        LOGGER.warn("springboot-topic | producer : " + msg);
        this.rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
    
}
五、消费者
定义一个消费者接口(这里闲着蛋疼定义的,不定义也可以)
public interface Consumer {
    void consumeStrMessage(String msg);
}
定义绑定第一个主题(中国.天气)的队列的消费者
@Component
@RabbitListener(queues = {Constants.TOPIC_QUEUE_ONE})
public class ConsumerTopicOne implements Consumer {
    private static final Log LOGGER = LogFactory.get();
    @RabbitHandler
    @Override
    public void consumeStrMessage(String msg) {
        LOGGER.warn("ConsumerTopicOne | consume msg : " + msg);
    }
}
定义第二个主题(中国.*)的队列的消费者
@Component
@RabbitListener(queues = Constants.TOPIC_QUEUE_TWO)
public class ConsumerTopicTwo implements Consumer {
    private static final Log LOGGER = LogFactory.get();
    @RabbitHandler
    @Override
    public void consumeStrMessage(String msg) {
        LOGGER.warn("ConsumerTopicTwo | consume msg : " + msg);
    }
}
六、测试
定义一个基于 springboot 的,生产者的测试类,对其 produce() 方法进行测试,分别发送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    @Autowired
    private Producer producer;
    @Test
    public void produceStrMessage() {
        producer.produceStrMessage(
                TOPIC_EXCHANGE,
                Producer.ROUTING_KEY_ONE,
                "message-one, my routing-key is --- " + Producer.ROUTING_KEY_ONE);
        producer.produceStrMessage(
                TOPIC_EXCHANGE,
                Producer.ROUTING_KEY_TWO,
                "message-two, my routing-key is --- " + Producer.ROUTING_KEY_TWO);
        producer.produceStrMessage(
                TOPIC_EXCHANGE,
                Producer.ROUTING_KEY_THREE,
                "message-third, my routing-key is --- " + Producer.ROUTING_KEY_THREE);
    }
    
}
输出结果

可以看到
- 生产者生成了三条消息
- 绑定队列一("中国.天气")消费者消费了一条消息
- 绑定队列二("中国.*")的消费者消费了三条消息
而且,如果将绑定配置里的 bindingExchangeOne() ,将队列一改为队列二,如下:
    @Bean
    public Binding bindingExchangeOne() {
        return BindingBuilder.bind(queueTwo).to(topicExchangeOne).with(TOPIC_ONE);
    }
那么,现在就队列一没绑定主题,队列二绑定了两个主题,如果发送,监听队列一的消费者肯定接收不到消息,测试:

可以看到的确如此,消费者一监听队列一,没有消费消息
0