RabbitMQ介绍与使用

谁借莪1个温暖的怀抱¢ 2023-02-25 10:23 7阅读 0赞

RabbitMQ介绍与使用

前言

RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。

  • 1 高可靠性、易扩展、高可用、功能丰富等特点
  • 2支持大多数(甚至冷门)的编程语言客户端。
  • 3 RabbitMQ遵循AMQP协议,自身采用Erlang(一种由爱立信开发的通用面向并发编程的语言)编写。
  • 4 RabbitMQ也支持MQTT等其他协议。

RabbitMQ具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:https://www.rabbitmq.com/community-plugins.html

RabbitMQ整体逻辑架构

在这里插入图片描述

如上图所示,RabbitMq整体架构图由一下几部分组成:

Server(broker):接收客户端连接,实现AMQP消息队列的路由功能的进程.简单来说就是消息队列服务器实体。

Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。权限控制组,用户只能关联到一个vhost上,一个vhost中可以有若干个Exchange和Queue,默认的vhost是”/“

Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列 Exchange Type决定了Exchange路由消息额行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout和Topic三种,不同类型的Exchange路由得到行为是不一样的

queue:用于存储还未消费的消息。消息队列载体,每个消息都会被投入到一个或多个队列。

Message:由Header和Body组成,Header是由生产者添加到各种属性的集合,包括Message是否被持久化,是由哪个Message Queue接收优先级是多少等,而Body是真正需要传输的APP数据

Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来。:

BindingKey: 在mq中设置的绑定key

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

生产者发送消息的流程

  1. 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel) 2. 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
  2. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
  3. 生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
  4. 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
  5. 相应的交换器根据接收到的 routingKey 查找相匹配的队列。
  6. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  7. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  8. 关闭信道。
  9. 关闭连接。

消费者接收消息的过程

  1. 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
  2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及
    做一些准备工作
  3. 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
  4. 消费者确认( ack) 接收到的消息。
  5. RabbitMQ 从队列中删除相应己经被确认的消息。
  6. 关闭信道。
  7. 关闭连接。

RabbitMQ Exchange类型

RabbitMQ 官网 文档 提供7种消息队列模式,如下图所示

在这里插入图片描述
在这里插入图片描述

依赖导入

  1. <dependency>
  2. <groupId>com.github.luues</groupId>
  3. <artifactId>spring-boot-starter-rabbitmq</artifactId>
  4. <version>1.2.9.1.RELEASE</version>
  5. </dependency>

普通消息队列

Hello World一对一的简单模式。生产者直接发送消息给RabbitMQ,另一端消费。未定义和指定
Exchange的情况下,使用的是AMQP default这个内置的Exchange。

在这里插入图片描述

生产者

  1. public class Send {
  2. public static final String QUEUE_NAME = "test_queue";
  3. public static void main(String[] args) {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. Channel channel = null;
  6. Connection connection = null;
  7. factory.setHost("localhost");
  8. //factory.setPort(5671);
  9. try {
  10. connection = factory.newConnection();
  11. channel = connection.createChannel();
  12. //创建生命队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. String message = "Hello World! 111 ";
  15. //发送消息
  16. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  17. System.out.println(" [x] Sent '" + message + "'");
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. } finally {
  21. factory.clone();
  22. if (connection != null) {
  23. try {
  24. connection.close();
  25. } catch (IOException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. }
  31. }

发送消息
在这里插入图片描述

消费者

  1. public class Receiving {
  2. public static void main(String[] argv) throws Exception {
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setHost("localhost");
  5. Connection connection = factory.newConnection();
  6. Channel channel = connection.createChannel();
  7. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  8. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  9. while (true){
  10. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  11. String message = new String(delivery.getBody(), "UTF-8");
  12. System.out.println(" [x] Received '" + message + "'");
  13. };
  14. //true 监听队列 有消息就获取 没有就阻塞
  15. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  16. }
  17. }
  18. }

接受消息
在这里插入图片描述
可以看到接受完消息后main方法没有停止,而是在监听消息处于阻塞状态。

work模式

生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。

一个生产者 多个消费者,只能有一个消费者接受到一个消息。
在这里插入图片描述

生产者

  1. public static String QUEUE_NAME = "test_queue";
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setHost("localhost");
  5. factory.setVirtualHost("/test");
  6. factory.setUsername("test");
  7. factory.setPassword("test");
  8. Connection connection = factory.newConnection();
  9. Channel channel = connection.createChannel();
  10. channel.queueDeclare(
  11. QUEUE_NAME, false, false,
  12. false, null
  13. );
  14. //发送消息
  15. for (int i = 0; i < 50; i++) {
  16. String message = "message: " + i;
  17. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  18. System.out.println(message);
  19. try {
  20. Thread.sleep(i * 10);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. channel.close();
  26. connection.close();
  27. }

消费者1 消费者2 代码一样 这里只写一个

  1. public static String QUEUE_NAME = "test_queue";
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setHost("localhost");
  5. factory.setVirtualHost("/test");
  6. factory.setUsername("test");
  7. factory.setPassword("test");
  8. Connection connection = factory.newConnection();
  9. Channel channel = connection.createChannel();
  10. //同一时刻只能发送一个消息给消费者
  11. channel.basicQos(1);
  12. channel.queueDeclare(
  13. QUEUE_NAME, false, false,
  14. false, null
  15. );
  16. DefaultConsumer consumer = new DefaultConsumer(channel) {
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. String message = new String(body, "UTF-8");
  20. System.out.println(" [x] Received '" + message + "'");
  21. try {
  22. Thread.sleep(2 * 1000);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. channel.basicAck(envelope.getDeliveryTag(), false);
  27. }
  28. };
  29. channel.basicConsume(QUEUE_NAME, false, consumer);
  30. }

需要注意的是

  1. //同一时刻只能发送一个消息给消费者 那个消费者早消费完 早可以拿消息进行消费 能者多劳
  2. channel.basicQos(1);

消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

  • 模式1:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

  1. channel.basicConsume(QUEUE_NAME, true, consumer);

只需再监听的时候设置为true就可以了,只要消费者拿到消息就会确认。

  • 模式2:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

  1. channel.basicConsume(QUEUE_NAME, false, consumer);

false 表示不自动确认消息

手动提交确认消息

  1. DefaultConsumer consumer = new DefaultConsumer(channel) {
  2. @Override
  3. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4. String message = new String(body, "UTF-8");
  5. System.out.println(" [x] Received '" + message + "'");
  6. try {
  7. Thread.sleep(2 * 1000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. //手动进行确认
  12. channel.basicAck(envelope.getDeliveryTag(), false);
  13. }
  14. };
  15. //监听队列 不自动提交确认
  16. channel.basicConsume(QUEUE_NAME, false, consumer);

订阅模式(fanout模式)

订阅模式(fanout)会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

一个生产者 多个消费者订阅 然后同时接受到消息,也叫广播模式。
在这里插入图片描述

X 表示交换机

红色两个队列绑定到X交换机

订阅者模式:

  • 1、1个生产者,多个消费者
  • 2、每一个消费者都有自己的一一个队列
  • 3、生产者没有将消息直接发送到队列,而是发送到了交换机
  • 4、每个队列都要绑定到交换机
  • 5、生产者发送的消息,经过交换机,到达队列,实现,-一个消息被多个消费者获取的目的

发布者

  1. //交换机
  2. private static final String EXCHANGE_NAME = "exchange_logs";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. factory.setVirtualHost("/test");
  7. factory.setUsername("test");
  8. factory.setPassword("test");
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. //绑定交换机
  12. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  13. //消息
  14. String msg = "订单更新..... ";
  15. channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
  16. System.out.println("发送的消息 : " + msg);
  17. channel.close();
  18. connection.close();
  19. }

两个订阅者

队列为 test_queue1_name 的订阅者

  1. package com.example.demo.publish;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /** * @description: * @author: Administrator * @create: 2020-07-12 17:55 **/
  9. public class Rece1 {
  10. //交换机
  11. private static final String EXCHANGE_NAME = "exchange_logs";
  12. //队列
  13. public static final String QUEUE_NAME = "test_queue1_name";
  14. public static void main(String[] args) throws IOException, TimeoutException {
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost("localhost");
  17. factory.setVirtualHost("/test");
  18. factory.setUsername("test");
  19. factory.setPassword("test");
  20. Connection connection = factory.newConnection();
  21. Channel channel = connection.createChannel();
  22. //声明队列
  23. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  24. //绑定交换机
  25. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  26. //绑定队列到交换机
  27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  28. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  29. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  30. String message = new String(delivery.getBody(), "UTF-8");
  31. System.out.println(" [x] Received1 '" + message + "'");
  32. };
  33. //自动确认消息
  34. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
  35. });
  36. }
  37. }

队列为 test_queue2_name 的订阅者

  1. package com.example.demo.publish;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /** * @description: * @author: Administrator * @create: 2020-07-12 17:55 **/
  9. public class Rece2 {
  10. //交换机
  11. private static final String EXCHANGE_NAME = "exchange_logs";
  12. //队列
  13. public static final String QUEUE_NAME = "test_queue2_name";
  14. public static void main(String[] args) throws IOException, TimeoutException {
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost("localhost");
  17. factory.setVirtualHost("/test");
  18. factory.setUsername("test");
  19. factory.setPassword("test");
  20. Connection connection = factory.newConnection();
  21. Channel channel = connection.createChannel();
  22. //声明队列
  23. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  24. //绑定交换机
  25. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  26. //绑定队列到交换机
  27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  28. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  29. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  30. String message = new String(delivery.getBody(), "UTF-8");
  31. System.out.println(" [x] Received2 '" + message + "'");
  32. };
  33. //自动确认消息
  34. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
  35. });
  36. }
  37. }

控制台中可以看到绑定的两个队列

在这里插入图片描述

发布的消息
在这里插入图片描述
订阅的消息
在这里插入图片描述

在这里插入图片描述

路由模式(direct模式)

在这里插入图片描述

x表示交换器

type表示交换器要发送消息的类型key

不同的key发送到不同的队列中去,可以将数据类型分开进行数据订阅,从而进行不同的数据处理。

路由流程图

根据不同的key进行订阅
在这里插入图片描述

如:前台新增一个商品数据,此时我们不需要写到redis中
前台删除一个商品数据,我们需要从redis中删除这条数据,也需要从数据库删除这条数据。

在发布订阅的基础代码上更改

但是必须设置事件类型为
在这里插入图片描述

  1. channel.exchangeDeclare(EXCHANGE_NAME, "direct");

路由发布者

  1. //消息
  2. String msg = "新增商品..... ";
  3. channel.basicPublish(EXCHANGE_NAME,"add",null,msg.getBytes());
  4. System.out.println("发送的消息 : " + msg);

删除redis缓存发布者

  1. //消息
  2. String msg = "删除商品..... ";
  3. channel.basicPublish(EXCHANGE_NAME,"del",null,msg.getBytes());
  4. System.out.println("发送的消息 : " + msg);

路由订阅者

增加订阅者

  1. //绑定队列到交换机
  2. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "add");
  3. //同时订阅多个路由类型可以多写一行
  4. //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");

删除订阅者

  1. //绑定队列到交换机
  2. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "del");

通配符匹配模式(Topic模式)

通配符匹配模式是对路由模式的

大白话来说:路由模式是全匹配key必须一样,通配符是模糊匹配。

通配符匹配模式图

在这里插入图片描述

匹配符
在这里插入图片描述

* 表示匹配一个
# 表示匹配多个

例如:

设置事件类型

  1. channel.exchangeDeclare(EXCHANGE_NAME, "topic");

接收iteme.add消息

  1. //交换机
  2. private static final String EXCHANGE_NAME = "exchange_topic";
  3. //队列
  4. public static final String QUEUE_NAME = "router_queue_topic_add";
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("localhost");
  8. factory.setVirtualHost("/test");
  9. factory.setUsername("test");
  10. factory.setPassword("test");
  11. Connection connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. //声明队列
  14. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  15. //绑定交换机
  16. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  17. //绑定队列到交换机
  18. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.add");
  19. //同一时刻只接受一条消息
  20. channel.basicQos(1);
  21. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. String message = new String(delivery.getBody(), "UTF-8");
  24. System.out.println(" [x] 新增商品 '" + message + "'");
  25. };
  26. //自动确认消息
  27. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
  28. });
  29. }

接收iteme.del删除消息

  1. //交换机
  2. private static final String EXCHANGE_NAME = "exchange_topic";
  3. //队列
  4. public static final String QUEUE_NAME = "router_queue_topic_del";
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("localhost");
  8. factory.setVirtualHost("/test");
  9. factory.setUsername("test");
  10. factory.setPassword("test");
  11. Connection connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. //声明队列
  14. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  15. //绑定交换机
  16. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  17. //绑定队列到交换机 绑定del路由 key
  18. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.del");
  19. //同一时刻只接受一条消息
  20. channel.basicQos(1);
  21. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. String message = new String(delivery.getBody(), "UTF-8");
  24. System.out.println(" [x] 删除redis缓存 '" + message + "'");
  25. };
  26. //自动确认消息
  27. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
  28. });
  29. }

接收所有消息

  1. //交换机
  2. private static final String EXCHANGE_NAME = "exchange_topic";
  3. //队列
  4. public static final String QUEUE_NAME = "router_queue_topic_all";
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("localhost");
  8. factory.setVirtualHost("/test");
  9. factory.setUsername("test");
  10. factory.setPassword("test");
  11. Connection connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. //声明队列
  14. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  15. //绑定交换机
  16. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  17. //绑定队列到交换机 绑定del路由 key
  18. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.#");
  19. //同一时刻只接受一条消息
  20. channel.basicQos(1);
  21. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. String message = new String(delivery.getBody(), "UTF-8");
  24. System.out.println(" 获取所有消息 '" + message + "'");
  25. };
  26. //自动确认消息
  27. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
  28. });
  29. }

发送消息

  1. //交换机
  2. private static final String EXCHANGE_NAME = "exchange_topic";
  3. public static void main(String[] argv) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("localhost");
  6. factory.setVirtualHost("/test");
  7. factory.setUsername("test");
  8. factory.setPassword("test");
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. //绑定交换机
  12. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  13. //消息
  14. String msg = "新增消息接收..... ";
  15. channel.basicPublish(EXCHANGE_NAME,"iteme.add",null,msg.getBytes());
  16. //String msg = "删除消息接收..... ";
  17. //channel.basicPublish(EXCHANGE_NAME,"iteme.del",null,msg.getBytes());
  18. System.out.println("发送的消息 : " + msg);
  19. channel.close();
  20. connection.close();
  21. }

topic消息队列功能更强大,可以兼容路由和发布订阅模式的消息功能。

代码地址

官网文档地址 https://www.rabbitmq.com/getstarted.html
环境安装 https://blog.csdn.net/weixin\_38361347/article/details/107292227

发表评论

表情:
评论列表 (有 0 条评论,7人围观)

还没有评论,来说两句吧...

相关阅读