springboot中rabbitmq的集成与使用
RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring的一个springAMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。SpringBoot则是将springAMQP包装了一层,提供了pring-boot-starter-amqp“Starter”来为通过RabbitMQ使用AMQP提供了便利。
rabbitmq的安装可参考《ubuntu下安装rabbitmq及使用》
springboot集成rabbitmq
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring-boot-starter-amqp
在springboot中,spring-boot-starter-amqp为我们提供了与rabbtmq的交互方式,其主要引入了如下依赖:
org.springframework.amqp:spring-rabbit
- org.springframework.amqp:spring-amqp
- com.rabbitmq:amqp-client
我们在使用的时候主要是spring-amqp和spring-rabbit里面的类及方法。
org.springframework.amqp:spring-amqp提供了amqp协议的交换器(exchange)、绑定(bind)、queue(队列)、message(消息)、template(模板)等的定义与封装,如下:
此处仅列出部分类的类名,有关类详情和更多类信息可自行查看源码
exchange(交换器)
- AbstractExchange 交换器的抽象
- DirectExchange direct交换器的定义
- FanoutExchange fanout交换器的定义
- HeadersExchange headers交换器的定义
- TopicExchange topic交换器的定义
- CustomExchange 自定义交换器的定义
- ExchangeBuilder 交换器的builder构造器
bind(绑定)
- Binding 绑定的定义
- BindingBuilder 绑定的builder构造器
message(消息)
- Message 消息的定义
- MessageBuilder 消息的builder构造器
queue(队列)
- Queue 队列的定义
- QueueBuilder 队列构造器
template(模板)
- AmqpTemplate amqp协议消息同步发送/接收等操作模板接口
- AmqpAdmin amqp协议的绑定、交换器、队列等申明/删除等操作的接口
- AsyncAmqpTemplate amqp协议消息异步发送/接收等操作模板接口
org.springframework.amqp:spring-rabbit主要提供了注解,方便我们基于注解使用,如下:
- @EnableRabbit 用于配置rabbitmq信息时使用
- @Exchange 交换器定义
- @Queue 队列定义
- @QueueBinding 队列绑定定义
- @RabbitHandler rabbitmq默认消息处理器
- @RabbitListeners、@RabbitListeners rabbitmq消息监听定义
同时还实现了AmqpTemplatej接口的BatchingRabbitTemplate、RabbitTemplate,实现了RabbitAdmin接口的RabbitAdmin,实现了AsyncAmqpTemplate的AsyncRabbitTemplate。
springboot中rabbitmq的使用
创建一个springboot项目,引入“spring-boot-starter-amqp”依赖,在application.yml文件中添加rabbitmq信息,如下所示:
spring:
rabbitmq:
virtual-host: sbac-rabbitmq
host: 127.0.0.1
port: 5672
username: admin
password: admin
这里没有用默认的“/”vhost,而使用了新创建的vhost “sbac-rabbitmq”,并设置账号admin对该vhost的权限信息,如下:
sudo rabbitmqctl add_vhost sbac-rabbitmq
sudo rabbitmqctl list_vhosts
sudo rabbitmqctl set_permissions -p sbac-rabbitmq admin ".*" ".*" ".*"
下面将来看一下springboot项目中,rabbitmq的,direct、fanout、topic、headers等四种交换器模式的使用。在此之前,先定义一个junit测试类,用于测试我们的消息发送,如下所示:
package com.lazycece.sbac.rabbitmq.producer;
import com.lazycece.sbac.rabbitmq.entity.Message;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.UUID;
/** * @author lazycece * @date 2019/04/04 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
private Message<String> message;
@Before
public void buildMessage() {
message = new Message<>();
message.setId(UUID.randomUUID().toString());
message.setContent("Hello, springboot-ac-rabbitmq !");
}
/** * 测试方法暂时省略 */
}
这里在测试类中只先给出了消息的定义,而至于几种模式的测试方法,将在介绍交换器模式使用的时候详细说明。
Meesage类的定义如下:
package com.lazycece.sbac.rabbitmq.entity;
import lombok.Data;
import java.io.Serializable;
/** * @author lazycece * @date 2019/04/04 */
@Data
public class Message<T> implements Serializable {
private String id;
private T content;
}
direct模式
direct为直连模式,通过direct交换器下发的消息是严格发送到按照指定路由键绑定的队列上。
下面是direct交换器模式的配置:
package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * @author lazycece * @date 2019/04/04 */
@Configuration
public class DirectConfig {
@Bean
public Queue directQueue() {
return QueueBuilder.durable("direct.queue").build();
}
@Bean
public DirectExchange directExchange() {
return (DirectExchange) ExchangeBuilder.directExchange("direct.exchange").build();
}
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).withQueueName();
}
}
如交换器的绑定设置所示,这里设置队列名为绑定路由键。下面给队列监听代码:
package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/** * @author lazycece * @date 2019/04/04 */
@Component
@Slf4j
public class DirectConsumer {
@RabbitListener(queues = { "direct.queue"})
public void directQueueConsumer(Message message) {
log.info("direct.queue -> {} ", message.toString());
}
}
现在来测试一下direct交换器模式的消息发送,测试方法代码如下:
@Test
public void directProducer() {
rabbitTemplate.convertAndSend("direct.exchange", "direct.queue", message);
}
此处向命名“direct.exchange”的交换器中发送消息,并设置路由键为“direct.queue”。方法运行之后,便可以看见队列“direct.queue”监听到了消息,并打印出了如下信息:
direct.queue -> Message(id=25b23bac-6c89-4350-8743-bbb274da89e4, content=Hello, springboot-ac-rabbitmq !)
如果发送消息的时候换一个路由键,比如“direct.queue.one”,那么就不会收到消息。
fanout模式
fanout交换器为分发交换器,或者叫广播模式更为合适,因为,其不会根据路由键去区分消息到底该下发到哪儿一个队列,绑定在该交换器上的队列都会收到下发致fanout交换器的消息。
fanout交换器的配置如下:
package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * @author lazycece * @date 2019/04/04 */
@Configuration
public class FanoutConfig {
@Bean
public Queue fanoutQueueOne() {
return QueueBuilder.durable("fanout.queue.one").build();
}
@Bean
public Queue fanoutQueueTwo() {
return QueueBuilder.durable("fanout.queue.two").build();
}
@Bean
public Queue fanoutQueueThree() {
return QueueBuilder.durable("fanout.queue.three").build();
}
@Bean
public FanoutExchange fanoutExchange() {
return (FanoutExchange) ExchangeBuilder.fanoutExchange("fanout.exchange").build();
}
@Bean
public Binding fanoutBindingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingTwo() {
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingThree() {
return BindingBuilder.bind(fanoutQueueThree()).to(fanoutExchange());
}
}
这里为此fanout交换器取名为“fanout.exchange”,并为其绑定了三个队列。下面给出三个队列的监听:
package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class FanoutConsumer {
@RabbitListener(queues = {"fanout.queue.one"})
public void fanoutQueueOneConsumer(Message message) {
log.info("fanout.queue.one -> {} ", message.toString());
}
@RabbitListener(queues = {"fanout.queue.two"})
public void fanoutQueueTwoConsumer(Message message) {
log.info("fanout.queue.two-> {} ", message.toString());
}
@RabbitListener(queues = {"fanout.queue.three"})
public void fanoutQueueThreeConsumer(Message message) {
log.info("fanout.queue.three -> {} ", message.toString());
}
}
现在来测试一下fanout交换器模式的消息发送,测试方法代码如下:
@Test
public void fanoutProducer() {
rabbitTemplate.convertAndSend("fanout.exchange", "", message);
}
此处向命名“fanout.exchange”的交换器中发送消息,方法运行之后,便可以看见其三个监听队列均收到了消息,并打印出了如下信息:
fanout.queue.three -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)
fanout.queue.one -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)
fanout.queue.two-> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)
topic模式
topic交换器是一个灵活的交换器,其可以根据路由键的规则,灵活的将消息发送到想要发送的队列中去。
topic交换器的配置如下:
package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * @author lazycece * @date 2019/04/04 */
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueueOne() {
return QueueBuilder.durable("topic.queue.one").build();
}
@Bean
public Queue topicQueueTwo() {
return QueueBuilder.durable("topic.queue.two").build();
}
@Bean
public TopicExchange topicExchange() {
return (TopicExchange) ExchangeBuilder.topicExchange("topic.exchange").build();
}
@Bean
public Binding topicBindingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with("routing-key");
}
@Bean
public Binding topicBindingTwo() {
return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with("#");
}
}
这里为此topic交换器取名为“topic.exchange”,并为其绑定了两个队列,一个路由规则为“routing-key”,另一个为“#”。下面给出两个队列的监听:
package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class TopicConsumer {
@RabbitListener(queues = {"topic.queue.one"})
public void topicQueueOneConsumer(Message message) {
log.info("topic.queue.one -> {} ", message.toString());
}
@RabbitListener(queues = {"topic.queue.two"})
public void topicQueueTwoConsumer(Message message) {
log.info("topic.queue.two -> {} ", message.toString());
}
}
现在来测试一下topic交换器模式的消息发送,测试方法代码如下:
@Test
public void topicProducer() {
rabbitTemplate.convertAndSend("topic.exchange", "routing-key", message);
}
此处向命名“topic.exchange”的交换器中发送消息,并指定路由规则为“routing-key”,方法运行之后,便可以看见其了两个监听队列均收到了消息,并打印出了如下信息:
topic.queue.two -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !)
topic.queue.one -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !)
为什么呢?因为“#”为通配符,可以匹配任意路由键。如果在发送消息的时候换路由规则为“routing”时,就会发现只有“topic.queue.two”队列收到消息了,因为“routing-key”无法和“routing”匹配,而“#”可以。
headers模式
headers交换器亦可以说是灵活的交换器,因为其是根据消息的headers中的信息来判断是否分发消息致某一个队列的。
headers交换器的配置如下:
package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * @author lazycece * @date 2019/04/04 */
@Configuration
public class HeadersConfig {
@Bean
public Queue headersQueue() {
return QueueBuilder.durable("headers.queue").build();
}
@Bean
public HeadersExchange headersExchange() {
return (HeadersExchange) ExchangeBuilder.headersExchange("headers.exchange").build();
}
@Bean
public Binding headersBinding() {
return BindingBuilder.bind(headersQueue()).to(headersExchange()).where("headers-key").exists();
}
}
这里为此headers交换器取名为“headers.exchange”,并为其绑定了一个队列“headers.queue”,然后设置只要headers中存在一个key名为“headers-key”时,便可以被分发消息。下面给出队列的监听:
package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class HeadersConsumer {
@RabbitListener(queues = {"headers.queue"})
public void headersQueueConsumer(Message message) {
log.info("headers.queue.one -> {} ", message.toString());
}
}
现在来测试一下headers交换器模式的消息发送,测试方法代码如下:
@Test
public void headersProducer() {
rabbitTemplate.convertAndSend("headers.exchange", "", message,
m -> {
m.getMessageProperties().getHeaders().put("headers-key", null);
return m;
});
}
此处向命名“headers.exchange”的交换器中发送消息,并为消息的headers中加入“headers-key”键,方法运行之后,便可以看见其监听队列收到了消息,并打印出了如下信息:
headers.queue.one -> Message(id=02d3d652-6fcc-4d3c-a18e-7fe90abe066e, content=Hello, springboot-ac-rabbitmq !)
注解使用
上面都是先给出rabbitmq的exchange、queue、binding相关的配置信息,再来监听消息,这里来看一下直接使用注解@RabbitListener来绑定和简单的用法,代码如下:
package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/** * @author lazycece * @date 2019/04/04 */
@Component
@Slf4j
public class AnnotationConsumer {
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "topic.queue.annotation"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = { "routing"}
)
}
)
public void topicQueueOneConsumer(Message message) {
log.info("topic.queue.annotation -> {} ", message.toString());
}
}
这里仍然是创建topic类型的交换器“topic.exchange”,然后创建队列“topic.queue.annotation”,再进行绑定路由键“routing”。这里用上文提到的topic模式的测试方法,毫无意外,“topic.queue.two”和“topic.queue.annotation”收到了消息,而“topic.queue.one”没有收到消息。
topic.queue.two -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !)
topic.queue.annotation -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !)
案例源码
案例源码地址:https://github.com/lazycece/springboot-actual-combat/tree/master/springboot-ac-rabbitmq
参考文档
- spring-amqp官方文档:https://spring.io/projects/spring-amqp
- springboot官方文档:https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-amqp
还没有评论,来说两句吧...