springboot中rabbitmq的集成与使用

短命女 2022-02-23 04:30 259阅读 0赞

RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring的一个springAMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。SpringBoot则是将springAMQP包装了一层,提供了pring-boot-starter-amqp“Starter”来为通过RabbitMQ使用AMQP提供了便利。

rabbitmq的安装可参考《ubuntu下安装rabbitmq及使用》

springboot集成rabbitmq

引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

spring-boot-starter-amqp

在springboot中,spring-boot-starter-amqp为我们提供了与rabbtmq的交互方式,其主要引入了如下依赖:

  • org.springframework.amqp:spring-rabbit

    • org.springframework.amqp:spring-amqp
    • com.rabbitmq:amqp-client

我们在使用的时候主要是spring-amqp和spring-rabbit里面的类及方法。

org.springframework.amqp:spring-amqp提供了amqp协议的交换器(exchange)、绑定(bind)、queue(队列)、message(消息)、template(模板)等的定义与封装,如下:

此处仅列出部分类的类名,有关类详情和更多类信息可自行查看源码

  • exchange(交换器)

    • AbstractExchange 交换器的抽象
    • DirectExchange direct交换器的定义
    • FanoutExchange fanout交换器的定义
    • HeadersExchange headers交换器的定义
    • TopicExchange topic交换器的定义
    • CustomExchange 自定义交换器的定义
    • ExchangeBuilder 交换器的builder构造器
  • bind(绑定)

    • Binding 绑定的定义
    • BindingBuilder 绑定的builder构造器
  • message(消息)

    • Message 消息的定义
    • MessageBuilder 消息的builder构造器
  • queue(队列)

    • Queue 队列的定义
    • QueueBuilder 队列构造器
  • template(模板)

    • AmqpTemplate amqp协议消息同步发送/接收等操作模板接口
    • AmqpAdmin amqp协议的绑定、交换器、队列等申明/删除等操作的接口
    • AsyncAmqpTemplate amqp协议消息异步发送/接收等操作模板接口

org.springframework.amqp:spring-rabbit主要提供了注解,方便我们基于注解使用,如下:

  • @EnableRabbit 用于配置rabbitmq信息时使用
  • @Exchange 交换器定义
  • @Queue 队列定义
  • @QueueBinding 队列绑定定义
  • @RabbitHandler rabbitmq默认消息处理器
  • @RabbitListeners、@RabbitListeners rabbitmq消息监听定义

同时还实现了AmqpTemplatej接口的BatchingRabbitTemplate、RabbitTemplate,实现了RabbitAdmin接口的RabbitAdmin,实现了AsyncAmqpTemplate的AsyncRabbitTemplate。

springboot中rabbitmq的使用

创建一个springboot项目,引入“spring-boot-starter-amqp”依赖,在application.yml文件中添加rabbitmq信息,如下所示:

  1. spring:
  2. rabbitmq:
  3. virtual-host: sbac-rabbitmq
  4. host: 127.0.0.1
  5. port: 5672
  6. username: admin
  7. password: admin

这里没有用默认的“/”vhost,而使用了新创建的vhost “sbac-rabbitmq”,并设置账号admin对该vhost的权限信息,如下:

  1. sudo rabbitmqctl add_vhost sbac-rabbitmq
  2. sudo rabbitmqctl list_vhosts
  3. sudo rabbitmqctl set_permissions -p sbac-rabbitmq admin ".*" ".*" ".*"

下面将来看一下springboot项目中,rabbitmq的,direct、fanout、topic、headers等四种交换器模式的使用。在此之前,先定义一个junit测试类,用于测试我们的消息发送,如下所示:

  1. package com.lazycece.sbac.rabbitmq.producer;
  2. import com.lazycece.sbac.rabbitmq.entity.Message;
  3. import org.junit.Before;
  4. import org.junit.Test;
  5. import org.junit.runner.RunWith;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.test.context.junit4.SpringRunner;
  9. import javax.annotation.Resource;
  10. import java.util.UUID;
  11. /** * @author lazycece * @date 2019/04/04 */
  12. @SpringBootTest
  13. @RunWith(SpringRunner.class)
  14. public class RabbitMqProducer {
  15. @Resource
  16. private RabbitTemplate rabbitTemplate;
  17. private Message<String> message;
  18. @Before
  19. public void buildMessage() {
  20. message = new Message<>();
  21. message.setId(UUID.randomUUID().toString());
  22. message.setContent("Hello, springboot-ac-rabbitmq !");
  23. }
  24. /** * 测试方法暂时省略 */
  25. }

这里在测试类中只先给出了消息的定义,而至于几种模式的测试方法,将在介绍交换器模式使用的时候详细说明。

Meesage类的定义如下:

  1. package com.lazycece.sbac.rabbitmq.entity;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. /** * @author lazycece * @date 2019/04/04 */
  5. @Data
  6. public class Message<T> implements Serializable {
  7. private String id;
  8. private T content;
  9. }

direct模式

direct为直连模式,通过direct交换器下发的消息是严格发送到按照指定路由键绑定的队列上。

下面是direct交换器模式的配置:

  1. package com.lazycece.sbac.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /** * @author lazycece * @date 2019/04/04 */
  6. @Configuration
  7. public class DirectConfig {
  8. @Bean
  9. public Queue directQueue() {
  10. return QueueBuilder.durable("direct.queue").build();
  11. }
  12. @Bean
  13. public DirectExchange directExchange() {
  14. return (DirectExchange) ExchangeBuilder.directExchange("direct.exchange").build();
  15. }
  16. @Bean
  17. public Binding directBinding() {
  18. return BindingBuilder.bind(directQueue()).to(directExchange()).withQueueName();
  19. }
  20. }

如交换器的绑定设置所示,这里设置队列名为绑定路由键。下面给队列监听代码:

  1. package com.lazycece.sbac.rabbitmq.consumer;
  2. import com.lazycece.sbac.rabbitmq.entity.Message;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /** * @author lazycece * @date 2019/04/04 */
  7. @Component
  8. @Slf4j
  9. public class DirectConsumer {
  10. @RabbitListener(queues = { "direct.queue"})
  11. public void directQueueConsumer(Message message) {
  12. log.info("direct.queue -> {} ", message.toString());
  13. }
  14. }

现在来测试一下direct交换器模式的消息发送,测试方法代码如下:

  1. @Test
  2. public void directProducer() {
  3. rabbitTemplate.convertAndSend("direct.exchange", "direct.queue", message);
  4. }

此处向命名“direct.exchange”的交换器中发送消息,并设置路由键为“direct.queue”。方法运行之后,便可以看见队列“direct.queue”监听到了消息,并打印出了如下信息:

  1. direct.queue -> Message(id=25b23bac-6c89-4350-8743-bbb274da89e4, content=Hello, springboot-ac-rabbitmq !)

如果发送消息的时候换一个路由键,比如“direct.queue.one”,那么就不会收到消息。

fanout模式

fanout交换器为分发交换器,或者叫广播模式更为合适,因为,其不会根据路由键去区分消息到底该下发到哪儿一个队列,绑定在该交换器上的队列都会收到下发致fanout交换器的消息。

fanout交换器的配置如下:

  1. package com.lazycece.sbac.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /** * @author lazycece * @date 2019/04/04 */
  6. @Configuration
  7. public class FanoutConfig {
  8. @Bean
  9. public Queue fanoutQueueOne() {
  10. return QueueBuilder.durable("fanout.queue.one").build();
  11. }
  12. @Bean
  13. public Queue fanoutQueueTwo() {
  14. return QueueBuilder.durable("fanout.queue.two").build();
  15. }
  16. @Bean
  17. public Queue fanoutQueueThree() {
  18. return QueueBuilder.durable("fanout.queue.three").build();
  19. }
  20. @Bean
  21. public FanoutExchange fanoutExchange() {
  22. return (FanoutExchange) ExchangeBuilder.fanoutExchange("fanout.exchange").build();
  23. }
  24. @Bean
  25. public Binding fanoutBindingOne() {
  26. return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
  27. }
  28. @Bean
  29. public Binding fanoutBindingTwo() {
  30. return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
  31. }
  32. @Bean
  33. public Binding fanoutBindingThree() {
  34. return BindingBuilder.bind(fanoutQueueThree()).to(fanoutExchange());
  35. }
  36. }

这里为此fanout交换器取名为“fanout.exchange”,并为其绑定了三个队列。下面给出三个队列的监听:

  1. package com.lazycece.sbac.rabbitmq.consumer;
  2. import com.lazycece.sbac.rabbitmq.entity.Message;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author lazycece
  8. * @date 2019/04/04
  9. */
  10. @Component
  11. @Slf4j
  12. public class FanoutConsumer {
  13. @RabbitListener(queues = {"fanout.queue.one"})
  14. public void fanoutQueueOneConsumer(Message message) {
  15. log.info("fanout.queue.one -> {} ", message.toString());
  16. }
  17. @RabbitListener(queues = {"fanout.queue.two"})
  18. public void fanoutQueueTwoConsumer(Message message) {
  19. log.info("fanout.queue.two-> {} ", message.toString());
  20. }
  21. @RabbitListener(queues = {"fanout.queue.three"})
  22. public void fanoutQueueThreeConsumer(Message message) {
  23. log.info("fanout.queue.three -> {} ", message.toString());
  24. }
  25. }

现在来测试一下fanout交换器模式的消息发送,测试方法代码如下:

  1. @Test
  2. public void fanoutProducer() {
  3. rabbitTemplate.convertAndSend("fanout.exchange", "", message);
  4. }

此处向命名“fanout.exchange”的交换器中发送消息,方法运行之后,便可以看见其三个监听队列均收到了消息,并打印出了如下信息:

  1. fanout.queue.three -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)
  2. fanout.queue.one -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)
  3. fanout.queue.two-> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)

topic模式

topic交换器是一个灵活的交换器,其可以根据路由键的规则,灵活的将消息发送到想要发送的队列中去。

topic交换器的配置如下:

  1. package com.lazycece.sbac.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /** * @author lazycece * @date 2019/04/04 */
  6. @Configuration
  7. public class TopicConfig {
  8. @Bean
  9. public Queue topicQueueOne() {
  10. return QueueBuilder.durable("topic.queue.one").build();
  11. }
  12. @Bean
  13. public Queue topicQueueTwo() {
  14. return QueueBuilder.durable("topic.queue.two").build();
  15. }
  16. @Bean
  17. public TopicExchange topicExchange() {
  18. return (TopicExchange) ExchangeBuilder.topicExchange("topic.exchange").build();
  19. }
  20. @Bean
  21. public Binding topicBindingOne() {
  22. return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with("routing-key");
  23. }
  24. @Bean
  25. public Binding topicBindingTwo() {
  26. return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with("#");
  27. }
  28. }

这里为此topic交换器取名为“topic.exchange”,并为其绑定了两个队列,一个路由规则为“routing-key”,另一个为“#”。下面给出两个队列的监听:

  1. package com.lazycece.sbac.rabbitmq.consumer;
  2. import com.lazycece.sbac.rabbitmq.entity.Message;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author lazycece
  8. * @date 2019/04/04
  9. */
  10. @Component
  11. @Slf4j
  12. public class TopicConsumer {
  13. @RabbitListener(queues = {"topic.queue.one"})
  14. public void topicQueueOneConsumer(Message message) {
  15. log.info("topic.queue.one -> {} ", message.toString());
  16. }
  17. @RabbitListener(queues = {"topic.queue.two"})
  18. public void topicQueueTwoConsumer(Message message) {
  19. log.info("topic.queue.two -> {} ", message.toString());
  20. }
  21. }

现在来测试一下topic交换器模式的消息发送,测试方法代码如下:

  1. @Test
  2. public void topicProducer() {
  3. rabbitTemplate.convertAndSend("topic.exchange", "routing-key", message);
  4. }

此处向命名“topic.exchange”的交换器中发送消息,并指定路由规则为“routing-key”,方法运行之后,便可以看见其了两个监听队列均收到了消息,并打印出了如下信息:

  1. topic.queue.two -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !)
  2. topic.queue.one -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !)

为什么呢?因为“#”为通配符,可以匹配任意路由键。如果在发送消息的时候换路由规则为“routing”时,就会发现只有“topic.queue.two”队列收到消息了,因为“routing-key”无法和“routing”匹配,而“#”可以。

headers模式

headers交换器亦可以说是灵活的交换器,因为其是根据消息的headers中的信息来判断是否分发消息致某一个队列的。

headers交换器的配置如下:

  1. package com.lazycece.sbac.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /** * @author lazycece * @date 2019/04/04 */
  6. @Configuration
  7. public class HeadersConfig {
  8. @Bean
  9. public Queue headersQueue() {
  10. return QueueBuilder.durable("headers.queue").build();
  11. }
  12. @Bean
  13. public HeadersExchange headersExchange() {
  14. return (HeadersExchange) ExchangeBuilder.headersExchange("headers.exchange").build();
  15. }
  16. @Bean
  17. public Binding headersBinding() {
  18. return BindingBuilder.bind(headersQueue()).to(headersExchange()).where("headers-key").exists();
  19. }
  20. }

这里为此headers交换器取名为“headers.exchange”,并为其绑定了一个队列“headers.queue”,然后设置只要headers中存在一个key名为“headers-key”时,便可以被分发消息。下面给出队列的监听:

  1. package com.lazycece.sbac.rabbitmq.consumer;
  2. import com.lazycece.sbac.rabbitmq.entity.Message;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author lazycece
  8. * @date 2019/04/04
  9. */
  10. @Component
  11. @Slf4j
  12. public class HeadersConsumer {
  13. @RabbitListener(queues = {"headers.queue"})
  14. public void headersQueueConsumer(Message message) {
  15. log.info("headers.queue.one -> {} ", message.toString());
  16. }
  17. }

现在来测试一下headers交换器模式的消息发送,测试方法代码如下:

  1. @Test
  2. public void headersProducer() {
  3. rabbitTemplate.convertAndSend("headers.exchange", "", message,
  4. m -> {
  5. m.getMessageProperties().getHeaders().put("headers-key", null);
  6. return m;
  7. });
  8. }

此处向命名“headers.exchange”的交换器中发送消息,并为消息的headers中加入“headers-key”键,方法运行之后,便可以看见其监听队列收到了消息,并打印出了如下信息:

  1. headers.queue.one -> Message(id=02d3d652-6fcc-4d3c-a18e-7fe90abe066e, content=Hello, springboot-ac-rabbitmq !)

注解使用

上面都是先给出rabbitmq的exchange、queue、binding相关的配置信息,再来监听消息,这里来看一下直接使用注解@RabbitListener来绑定和简单的用法,代码如下:

  1. package com.lazycece.sbac.rabbitmq.consumer;
  2. import com.lazycece.sbac.rabbitmq.entity.Message;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.ExchangeTypes;
  5. import org.springframework.amqp.rabbit.annotation.Exchange;
  6. import org.springframework.amqp.rabbit.annotation.Queue;
  7. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.stereotype.Component;
  10. /** * @author lazycece * @date 2019/04/04 */
  11. @Component
  12. @Slf4j
  13. public class AnnotationConsumer {
  14. @RabbitListener(
  15. bindings = {
  16. @QueueBinding(
  17. value = @Queue(name = "topic.queue.annotation"),
  18. exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
  19. key = { "routing"}
  20. )
  21. }
  22. )
  23. public void topicQueueOneConsumer(Message message) {
  24. log.info("topic.queue.annotation -> {} ", message.toString());
  25. }
  26. }

这里仍然是创建topic类型的交换器“topic.exchange”,然后创建队列“topic.queue.annotation”,再进行绑定路由键“routing”。这里用上文提到的topic模式的测试方法,毫无意外,“topic.queue.two”和“topic.queue.annotation”收到了消息,而“topic.queue.one”没有收到消息。

  1. topic.queue.two -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !)
  2. topic.queue.annotation -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !)

案例源码

案例源码地址:https://github.com/lazycece/springboot-actual-combat/tree/master/springboot-ac-rabbitmq

参考文档

  • spring-amqp官方文档:https://spring.io/projects/spring-amqp
  • springboot官方文档:https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-amqp

发表评论

表情:
评论列表 (有 0 条评论,259人围观)

还没有评论,来说两句吧...

相关阅读