消息队列 - RabbitMQ 集成 SpringBoot (Topic)
从零开始一个 Topic 模式的 Demo,做了以下的事情:
- 定义两个主题:
- 中国.天气
- 中国.*
- 定义生产者通过三个 Routing-Key 发送三条消息
- 中国.天气
- 中国.新闻
- 中国.艺术
- 定义两个队列,绑定两个主题
- 中国.天气
- 中国.*
- 定义两个消费者,监听消费以上定义的两个队列
- 查看结果,正确的话,是绑定 “中国.天气” 的消费者消费了一条消息,绑定 “中国.*” 的消费者消费了三条消息!
一、依赖配置
还是老样子,和上边依赖,导入三个 artifactid 为以下的依赖:
- spring-boot-starter-amqp
- hutool-all
- spring-boot-starter-test
- lombok
配置一下基本 RabbitMQ 服务基本信息
spring:
rabbitmq:
port: 5672
username: admin
password: admin
host: localhost
二、常量类
定义一些基本的常量信息,消息队列的操作中,常涉及比较多的组件的时候,用常量信息比较好区分,这里就是定义了基本的一些涉及组件,例如交换器、队列、主题的名称都是什么:
public class Constants {
/**
* 队列一的名字
*/
public static final String TOPIC_QUEUE_ONE = "topic_queue_one";
/**
* 队列二的名字
*/
public static final String TOPIC_QUEUE_TWO = "topic_queue_two";
/**
* 交换器的名字
*/
public static final String TOPIC_EXCHANGE = "topic_exchange";
/**
* 话题一
*/
public static final String TOPIC_ONE = "中国.天气";
/**
* 话题二
*/
public static final String TOPIC_TWO = "中国.*";
}
三、配置类
依次创建我们想要的信息:
- 两个队列
- 一个交换器
- 绑定一个队列到主题一
- 绑定一个队列到主题二
一般,在组件配置类和绑定配置类要抽离出来
组件配置 : 创建队列、交换器
@Configuration
public class MqComponentConfig {
/**
* autoDelete() 是配置消息通信后自动删除,否则会一直用同一个 queue,
* 后面的 topic 一直累加绑定上去!!!
*
* @return
*/
@Bean(autowire = Autowire.BY_NAME, value = "queueOne")
public Queue queueOne() {
return QueueBuilder.durable(TOPIC_QUEUE_ONE).autoDelete().build();
}
@Bean(autowire = Autowire.BY_NAME, value = "queueTwo")
public Queue queueTwo() {
return QueueBuilder.durable(TOPIC_QUEUE_TWO).autoDelete().build();
}
@Bean(autowire = Autowire.BY_NAME, value = "topicExchangeOne")
public TopicExchange topicExchangeOne() {
return new TopicExchange(TOPIC_EXCHANGE);
}
}
绑定配置:创建绑定关系
@Configuration
public class MqBindingConfig {
@Resource(name = "queueOne")
private Queue queueOne;
@Resource(name = "queueTwo")
private Queue queueTwo;
@Resource(name = "topicExchangeOne")
private TopicExchange topicExchangeOne;
/**
* 将创建的队列一绑定到主题一
*
* @return
*/
@Bean
public Binding bindingExchangeOne() {
return BindingBuilder.bind(queueOne).to(topicExchangeOne).with(TOPIC_ONE);
}
/**
* 将创建的队列二绑定到主题二
*
* @return
*/
@Bean
public Binding bindingExchangeTwo() {
return BindingBuilder.bind(queueTwo).to(topicExchangeOne).with(TOPIC_TWO);
}
}
但其实可以写到一个配置类里面,如下,因为 @Bean 没有用 value 指定 bean 的名字,注入进入就是方法名,所以在绑定关系中:
- 绑定关系一的队列参数名和注入的 bean 的方法要一致!也就是 queueOne
- 绑定关系二同理
- 而 exchange 只有一个却不用匹配,是因为只注入一个,所以默认选它了,如果有两个就要指定一下了!
@Configuration
public class MqConfig {
@Bean
public Queue queueOne() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE_ONE).autoDelete().build();
}
@Bean
public Queue queueTwo() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE_TWO).autoDelete().build();
}
@Bean
public TopicExchange topicExchangeOne() {
return new TopicExchange(Constants.TOPIC_EXCHANGE);
}
@Bean
public Binding bindingExchangeOne(Queue queueOne, TopicExchange exchange) {
return BindingBuilder.bind(queueOne).to(exchange).with(Constants.TOPIC_ONE);
}
@Bean
public Binding bindingExchangeTwo(Queue queueTwo, TopicExchange exchange) {
return BindingBuilder.bind(queueTwo).to(exchange).with(Constants.TOPIC_TWO);
}
}
四、生产者
由于路由键目前只与这一个生产值有关,所以路由键常量就定义在这里了
@Component
public class Producer {
public static final String ROUTING_KEY_ONE = "中国.天气";
public static final String ROUTING_KEY_TWO = "中国.新闻";
public static final String ROUTING_KEY_THREE = "中国.艺术";
private static final Log LOGGER = LogFactory.get();
@Autowired
private AmqpTemplate rabbitTemplate;
public void produceStrMessage(String exchangeName, String routingKey, String msgId) {
String msg = "Hi, it's message " + msgId;
LOGGER.warn("springboot-topic | producer : " + msg);
this.rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
}
五、消费者
定义一个消费者接口(这里闲着蛋疼定义的,不定义也可以)
public interface Consumer {
void consumeStrMessage(String msg);
}
定义绑定第一个主题(中国.天气)的队列的消费者
@Component
@RabbitListener(queues = {Constants.TOPIC_QUEUE_ONE})
public class ConsumerTopicOne implements Consumer {
private static final Log LOGGER = LogFactory.get();
@RabbitHandler
@Override
public void consumeStrMessage(String msg) {
LOGGER.warn("ConsumerTopicOne | consume msg : " + msg);
}
}
定义第二个主题(中国.*)的队列的消费者
@Component
@RabbitListener(queues = Constants.TOPIC_QUEUE_TWO)
public class ConsumerTopicTwo implements Consumer {
private static final Log LOGGER = LogFactory.get();
@RabbitHandler
@Override
public void consumeStrMessage(String msg) {
LOGGER.warn("ConsumerTopicTwo | consume msg : " + msg);
}
}
六、测试
定义一个基于 springboot 的,生产者的测试类,对其 produce() 方法进行测试,分别发送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private Producer producer;
@Test
public void produceStrMessage() {
producer.produceStrMessage(
TOPIC_EXCHANGE,
Producer.ROUTING_KEY_ONE,
"message-one, my routing-key is --- " + Producer.ROUTING_KEY_ONE);
producer.produceStrMessage(
TOPIC_EXCHANGE,
Producer.ROUTING_KEY_TWO,
"message-two, my routing-key is --- " + Producer.ROUTING_KEY_TWO);
producer.produceStrMessage(
TOPIC_EXCHANGE,
Producer.ROUTING_KEY_THREE,
"message-third, my routing-key is --- " + Producer.ROUTING_KEY_THREE);
}
}
输出结果
可以看到
- 生产者生成了三条消息
- 绑定队列一("中国.天气")消费者消费了一条消息
- 绑定队列二("中国.*")的消费者消费了三条消息
而且,如果将绑定配置里的 bindingExchangeOne()
,将队列一改为队列二,如下:
@Bean
public Binding bindingExchangeOne() {
return BindingBuilder.bind(queueTwo).to(topicExchangeOne).with(TOPIC_ONE);
}
那么,现在就队列一没绑定主题,队列二绑定了两个主题,如果发送,监听队列一的消费者肯定接收不到消息,测试:
可以看到的确如此,消费者一监听队列一,没有消费消息