RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析

叁歲伎倆 2024-02-21 07:27 15阅读 0赞

在这里插入图片描述

前言

RabbitMQ作为一款常用的消息中间件,在微服务项目中得到大量应用,其本身是微服务中的重点和难点,有不少概念我自己的也是一知半解,本系列博客尝试结合实际应用场景阐述RabbitMQ的应用,分析其为什么使用,并给出怎么用的案例。

本篇博客结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码。(文末有惊喜~)

在这里插入图片描述
其他相关的rabbitmq博客文章列表如下:

RabbitMQ基础(1)——生产者消费者模型 & RabbitMQ简介 & Docker版本的安装配置 & RabbitMQ的helloworld + 分模块构建 & 解决大量注册案例

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

RabbitMQ的Docker版本安装 + 延迟插件安装 & QQ邮箱和阿里云短信验证码的主题模式发送

目录

  • 前言
  • 引出
  • MQ场景:
    • 1异步处理
    • 2解耦
    • 3削峰填谷
    • 常见的MQ
  • 拆解控制台页面
    • 总览页面
    • 连接connection
    • 队列页面
  • 简单模式
    • 工具类:建立连接
    • 生产者:生产消息
    • 消费者:消费消息
    • 进行测试
      • Ack:项目中必须false
  • 工作模式
    • 一个生产者,两个消费者
    • QOS: 限流
      • Tips:队列参数怎么变?
    • 队列相关参数
      • x-max-length
      • x-overflow
      • x-max-length-bytes
  • 发布者订阅模式
    • 生产者:给fanout交换机发消息
    • 消费者1:队列q32
    • 消费者2:队列q321
  • 路由模式
    • 生产者:发给交换机,告诉路由键
    • 消费者1:根据路由键,接收3种消息
    • 消费者2:根据路由键,接收1种消息
  • 主题模式(Topic)
    • 生产者:主题交换机,带路由键
    • 消费者1:通配符的路由键
    • 消费者2:统配符
  • 总结

引出


1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;

在这里插入图片描述

MQ场景:

Message Queue 消息队列。

有比较大的负载,而且这些负载不用立刻马上给程序返回结构,可以有一些等待时间。可以用消息队列。

1异步处理

过去的项目,大部分都是同步解决问题。

在这里插入图片描述

  1. UserinfoService{
  2. register(){
  3. //典型的同步
  4. //插入数据库
  5. //发短信
  6. //发邮件
  7. }
  8. }
  9. //如果你的操作基本没有任何延时操作,或者瓶颈,没有压力。那么你没有必要用MQ
  10. UserinfoService{
  11. register(){
  12. //发起的是一个异步请求
  13. 都不等待对方给我返回的结果
  14. //异步插入数据库
  15. //异步发短信
  16. //异步发邮件
  17. }
  18. }

2解耦

在这里插入图片描述

库存有没有可能挂了。或者访问量巨大。因为库存慢,导致订单也慢。

在这里插入图片描述

解耦。

在这里插入图片描述

3削峰填谷

双12的时候,叫好早上的8点。秒杀的时候。

有些瞬间服务器压力是超大的,过了这个瞬间,几乎没有消耗量。

等服务器能够正常的时候,我慢慢执行就行了。

在这里插入图片描述

常见的MQ

MQ的前身,就是一个发布者订阅模式。

Kafka RabbitMQ(1W/s) RocketMQ ActiveMQ

Kafka : 10w/S 主要用于日志

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

在这里插入图片描述

拆解控制台页面

总览页面

包括刷新时间间隔,rabbitmq节点,连接端口的信息等

在这里插入图片描述

配置可以导出和导入

在这里插入图片描述

连接connection

在这里插入图片描述

队列页面

包括队列的名字,状态,准备好的消息数量,未确认的消息数量,总计消息数量

在这里插入图片描述

简单模式

在这里插入图片描述

使用的依赖

  1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.7.3</version>
  6. </dependency>

工具类:建立连接

在这里插入图片描述

  1. package com.tianju.config;
  2. import com.rabbitmq.client.Connection;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 建立连接的工具类
  7. */
  8. public class ConnectionFactory {
  9. public static Connection createConnection() throws IOException, TimeoutException {
  10. com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
  11. connectionFactory.setHost("192.168.111.130"); // http://192.168.111.130/
  12. connectionFactory.setPort(5672);
  13. connectionFactory.setUsername("admin");
  14. connectionFactory.setPassword("123");
  15. connectionFactory.setVirtualHost("/demo");
  16. // amqp://admin@192.168.111.130:5672/
  17. return connectionFactory.newConnection();
  18. }
  19. }

在这里插入图片描述

生产者:生产消息

在这里插入图片描述

  1. package com.tianju.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.tianju.config.ConnectionFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 生产者发送消息
  10. * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
  11. */
  12. public class Provider {
  13. private static String QUEUE_ORDER = "queue_order";
  14. public static void main(String[] args) throws IOException, TimeoutException {
  15. // 建立连接
  16. Connection connection = ConnectionFactory.createConnection();
  17. // 创建频道
  18. Channel channel = connection.createChannel();
  19. // 创建队列
  20. channel.queueDeclare(QUEUE_ORDER,false,false,false,null);
  21. // 发送消息,指定给哪个队列上发消息
  22. for (int i = 0; i < 100; i++) {
  23. String msg = "hello rabbitmq--"+i;
  24. channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());
  25. System.out.println("消息发布成功");
  26. }
  27. connection.close();
  28. }
  29. }

消费者:消费消息

在这里插入图片描述

  1. package com.tianju.simple;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer {
  7. private static String QUEUE_ORDER = "queue_order";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 建立连接
  10. Connection connection = ConnectionFactory.createConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. // 创建队列
  14. channel.queueDeclare(QUEUE_ORDER,false,false,false,null);
  15. // 队列必须声明,如果不存在,则自动创建
  16. // 声明一个消费者
  17. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  18. /**
  19. * 回调函数,用来接收消息
  20. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  21. * @param envelope packaging data for the message
  22. * @param properties content header data for the message
  23. * @param body the message body (opaque, client-specific byte array),字节数组
  24. * @throws IOException
  25. */
  26. public void handleDelivery(String consumerTag,
  27. Envelope envelope,
  28. AMQP.BasicProperties properties,
  29. byte[] body)
  30. throws IOException
  31. {
  32. // no work to do
  33. String msg = new String(body); // 收到的信息
  34. System.out.println("消费者接收到:"+msg);
  35. try {
  36. Thread.sleep(3000);// 模拟一个耗时操作
  37. } catch (InterruptedException e) {
  38. throw new RuntimeException(e);
  39. }
  40. }
  41. };
  42. // 表明自己是消费者,接收消息
  43. /**
  44. * autoAck:自动确认设置成 true
  45. */
  46. channel.basicConsume(QUEUE_ORDER, true, defaultConsumer);
  47. }
  48. }

进行测试

生产者发送100条消息
在这里插入图片描述

消费者进行消费,假设突然之间服务宕机了,此时消费了4条消息,理论上还应该有96条消息

在这里插入图片描述

打开控制台页面查看,消息全部消失了,出现了数据丢失的情况

在这里插入图片描述

全部用默认值的情况下,如果发生异常,则消息全部丢失。

消费者一次性拿到了所有的消息。

Ack:项目中必须false

生产者 ==投递消息=》队列

消费者=接受==》队列

ack: false 必须要手工确认。

消费者接到这个消息的时候, 这个消息进入 unack状态。

1:手工确认。消息删除。完成

2: 没手工确认,断开连接或者超时。

3:这个消息重新进入ready状态。等待其他消费者进行消费。

在这里插入图片描述

先设置自动ack为false,表示需要手工确认,然后在消费消息的方法中,进行消息的确认。

在这里插入图片描述

  1. package com.tianju.simple;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer {
  7. private static String QUEUE_ORDER = "queue_order";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 建立连接
  10. Connection connection = ConnectionFactory.createConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. // 创建队列
  14. channel.queueDeclare(QUEUE_ORDER,false,false,false,null);
  15. // 队列必须声明,如果不存在,则自动创建
  16. // 声明一个消费者
  17. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  18. /**
  19. * 回调函数,用来接收消息
  20. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  21. * @param envelope packaging data for the message
  22. * @param properties content header data for the message
  23. * @param body the message body (opaque, client-specific byte array),字节数组
  24. * @throws IOException
  25. */
  26. public void handleDelivery(String consumerTag,
  27. Envelope envelope,
  28. AMQP.BasicProperties properties,
  29. byte[] body)
  30. throws IOException {
  31. // no work to do
  32. String msg = new String(body); // 收到的信息
  33. System.out.println("消费者接收到:"+msg);
  34. try {
  35. Thread.sleep(3000);// 模拟一个耗时操作
  36. } catch (InterruptedException e) {
  37. throw new RuntimeException(e);
  38. }
  39. // 消费者代码可能有失败,消息拿到之后,可能还没有处理,就宕机了
  40. // 消息确认的代码一定在最后一行
  41. // long deliveryTag 消息的下标;
  42. // boolean multiple 是否批量确认;
  43. channel.basicAck(envelope.getDeliveryTag(),true); // 确认消息,批量确认
  44. }
  45. };
  46. // 表明自己是消费者,接收消息
  47. /**
  48. * autoAck:自动确认设置成 true
  49. * 是否自动确认,
  50. * false:不进行自动确认;true:自动确认
  51. * 消费过程中可能产生异常
  52. * 如果产生异常,则必须进行消费补偿
  53. */
  54. channel.basicConsume(QUEUE_ORDER, false, defaultConsumer); // 接收消息
  55. }
  56. }

再次模拟宕机的情况,一开始消息全部被放到Unack中,当宕机时,又把消息吐了出来,至少消息没有出现丢失的情况。

在这里插入图片描述

工作模式

在这里插入图片描述

一个生产者,两个消费者

在这里插入图片描述

两个消费者消费的是同一个队列中的消息。

两个消费者上来后,默认是按照平均分配。结果: 有人瞬间干完。有人很久都没干完。

QOS: 限流

可以限制你一次拉取几个。

这样两个消费者,也能够节省服务器CPU了。

  1. // 创建频道
  2. Channel channel = connection.createChannel();
  3. channel.basicQos(1); /** 1次只能拿1个 **/

在这里插入图片描述

当消费的速度太慢,觉得不够,加入其他的消费者。

核心,只有一个消息队列。

消息是轮询的方式,发送给不同的消费者。

在这里插入图片描述

Tips:队列参数怎么变?

队列的参数发生变化后,要删除再添加。

在这里插入图片描述

  1. // 创建频道
  2. Channel channel = connection.createChannel();
  3. channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列
  4. boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在
  5. // 创建队列
  6. channel.queueDeclare(QUEUE_ORDER,durable,false,false,null);

在这里插入图片描述

队列相关参数

x-max-length

在这里插入图片描述

  1. package com.tianju.work;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.tianju.config.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * 生产者发送消息
  11. * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
  12. */
  13. public class Provider {
  14. private static String QUEUE_ORDER = "queue_order";
  15. public static void main(String[] args) throws IOException, TimeoutException {
  16. // 建立连接
  17. Connection connection = ConnectionFactory.createConnection();
  18. // 创建频道
  19. Channel channel = connection.createChannel();
  20. channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列
  21. boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在
  22. Map map = new HashMap();
  23. map.put("x-max-length", 10); // 设置最大的长度,只接受10个
  24. // 创建队列
  25. channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);
  26. // 发送消息,指定给哪个队列上发消息
  27. for (int i = 0; i < 1000; i++) {
  28. String msg = "hello rabbitmq--"+i;
  29. channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());
  30. System.out.println("消息发布成功");
  31. }
  32. connection.close();
  33. }
  34. }

How many (ready) messages a queue can contain before it starts to drop them from its head.

在这里插入图片描述

x-overflow

Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head, reject-publish or reject-publish-dlx. The quorum queue type only supports drop-head and reject-publish.

在这里插入图片描述
此时就是前10个了

在这里插入图片描述

x-max-length-bytes

在这里插入图片描述

  1. package com.tianju.work;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.tianju.config.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * 生产者发送消息
  11. * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
  12. */
  13. public class Provider {
  14. private static String QUEUE_ORDER = "queue_order";
  15. public static void main(String[] args) throws IOException, TimeoutException {
  16. // 建立连接
  17. Connection connection = ConnectionFactory.createConnection();
  18. // 创建频道
  19. Channel channel = connection.createChannel();
  20. channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列
  21. boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在
  22. Map map = new HashMap();
  23. map.put("x-max-length", 10); // 设置最大的长度,只接受10个
  24. map.put("x-overflow", "reject-publish"); // 拒绝发布,变成前10ge
  25. map.put("x-max-length-bytes", 4); // 消息的最大字节长度
  26. // 创建队列
  27. channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);
  28. // 发送消息,指定给哪个队列上发消息
  29. for (int i = 0; i < 20; i++) {
  30. String msg = "53"+i;
  31. channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());
  32. msg = "hello rabbitmq--"+i;
  33. channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());
  34. System.out.println("消息发布成功");
  35. }
  36. connection.close();
  37. }
  38. }

在这里插入图片描述

参数

在这里插入图片描述

  1. //创建队列
  2. boolean durable = true;//当前队列中的消息进行持久化操作 重启之后,消息还在
  3. Map map = new HashMap();
  4. map.put("x-max-length",10);//设置队列的最大长度
  5. map.put("x-overflow","reject-publish");//设置队列的最大长度 drop head
  6. map.put("x-max-length-bytes",4);//设置消息的最大字节长度
  7. // map.put("x-expires",10000);//超时后,直接删除队列
  8. channel.queueDeclare(QUQUENAME,durable,false,false,map);

发布者订阅模式

在这里插入图片描述

工作模式中,只有一个队列。两个消费者轮询从同一个队列中取数据。

发布者订阅模式: 交换机后面会绑定多个消息队列。每个消息队列都有完整的信息。每个队列后的消费者都会有完整的消息。

交换机:就是一个无意识的广播。行为就是扇出.

在这里插入图片描述

场景:

下订单:

1:订单的数据入数据库
2:发送订单的短信
3: 物流

要每个队列都有完整的信息。

在这里插入图片描述

生产者:给fanout交换机发消息

  1. package com.tianju.publish;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.MessageProperties;
  5. import com.tianju.config.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 生产者发送消息
  10. * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
  11. */
  12. public class Provider {
  13. private static String QUEUE_ORDER = "queue_order";
  14. private static String EXCHANGE = "pet_exchange";
  15. public static void main(String[] args) throws IOException, TimeoutException {
  16. // 建立连接
  17. Connection connection = ConnectionFactory.createConnection();
  18. // 创建频道
  19. Channel channel = connection.createChannel();
  20. // 创建交换机
  21. channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个
  22. for (int i = 0; i < 100; i++) {
  23. channel.basicPublish(EXCHANGE, "", MessageProperties.TEXT_PLAIN, ("hello fanout"+i).getBytes());
  24. }
  25. }
  26. }

消费者1:队列q32

  1. package com.tianju.publish;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer {
  7. private static String EXCHANGE = "pet_exchange";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 建立连接
  10. Connection connection = ConnectionFactory.createConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. // 创建交换机
  14. channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个
  15. channel.queueDeclare("q32", false, false, false, null);
  16. channel.queueBind("q32", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的
  17. channel.basicConsume("q32", new DefaultConsumer(channel){
  18. public void handleDelivery(String consumerTag,
  19. Envelope envelope,
  20. AMQP.BasicProperties properties,
  21. byte[] body)
  22. throws IOException
  23. {
  24. // no work to do
  25. String msg = new String(body);
  26. System.out.println("消费者1:"+msg);
  27. channel.basicAck(envelope.getDeliveryTag(), false);
  28. }
  29. });
  30. }
  31. }

消费者2:队列q321

  1. package com.tianju.publish;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer2 {
  7. private static String EXCHANGE = "pet_exchange";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 建立连接
  10. Connection connection = ConnectionFactory.createConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. // 创建交换机
  14. channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个
  15. channel.queueDeclare("q321", false, false, false, null);
  16. channel.queueBind("q321", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的
  17. channel.basicConsume("q321", new DefaultConsumer(channel){
  18. public void handleDelivery(String consumerTag,
  19. Envelope envelope,
  20. AMQP.BasicProperties properties,
  21. byte[] body)
  22. throws IOException
  23. {
  24. // no work to do
  25. String msg = new String(body);
  26. System.out.println("消费者2:"+msg);
  27. channel.basicAck(envelope.getDeliveryTag(), false);
  28. }
  29. });
  30. }
  31. }

路由模式

允许用不同的路由键来接受不同的信息。

在这里插入图片描述

队列会接到完整的全部的信息。

日志系统:

日志级别:

OFF INFO DEBUG ERROR FATAL ALL

如果是致命错误: 立刻给运维发短信。

全部信息: 放到专门的日志系统
Error: 进行发邮件

在这里插入图片描述

生产者:发给交换机,告诉路由键

  1. package com.tianju.routing;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.MessageProperties;
  5. import com.tianju.config.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 生产者发送消息
  10. * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
  11. */
  12. public class Provider {
  13. private static String EXCHANGE = "exchange_58";
  14. public static void main(String[] args) throws IOException, TimeoutException {
  15. // 建立连接
  16. Connection connection = ConnectionFactory.createConnection();
  17. // 创建频道
  18. Channel channel = connection.createChannel();
  19. // 创建交换机
  20. channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机
  21. String msg = "this is fatal message";
  22. channel.basicPublish(EXCHANGE,"fatal", MessageProperties.TEXT_PLAIN,msg.getBytes());
  23. msg = "this is error message";
  24. channel.basicPublish(EXCHANGE,"error", MessageProperties.TEXT_PLAIN,msg.getBytes());
  25. msg = "this is debug message";
  26. channel.basicPublish(EXCHANGE,"debug", MessageProperties.TEXT_PLAIN,msg.getBytes());
  27. }
  28. }

消费者1:根据路由键,接收3种消息

  1. package com.tianju.routing;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.Calendar;
  6. import java.util.concurrent.TimeoutException;
  7. public class Consumer {
  8. private static String EXCHANGE = "exchange_58";
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. // 建立连接
  11. Connection connection = ConnectionFactory.createConnection();
  12. // 创建频道
  13. Channel channel = connection.createChannel();
  14. // 创建队列
  15. // 创建交换机
  16. channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机
  17. // 队列必须声明,如果不存在,则自动创建
  18. channel.queueDeclare("q58", false, false, false, null);
  19. channel.queueBind("q58", EXCHANGE, "fatal");
  20. channel.queueBind("q58", EXCHANGE, "error");
  21. channel.queueBind("q58", EXCHANGE, "debug");
  22. channel.basicConsume("q58", new DefaultConsumer(channel){
  23. public void handleDelivery(String consumerTag,
  24. Envelope envelope,
  25. AMQP.BasicProperties properties,
  26. byte[] body)
  27. throws IOException
  28. {
  29. // no work to do
  30. String msg = new String(body);
  31. System.out.println("消费者1:"+msg);
  32. channel.basicAck(envelope.getDeliveryTag(), false);
  33. }
  34. });
  35. }
  36. }

消费者2:根据路由键,接收1种消息

  1. package com.tianju.routing;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer2 {
  7. private static String EXCHANGE = "exchange_58";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 建立连接
  10. Connection connection = ConnectionFactory.createConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. // 创建队列
  14. // 创建交换机
  15. channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机
  16. // 队列必须声明,如果不存在,则自动创建
  17. channel.queueDeclare("q581", false, false, false, null);
  18. channel.queueBind("q581", EXCHANGE, "fatal");
  19. channel.basicConsume("q581", new DefaultConsumer(channel){
  20. public void handleDelivery(String consumerTag,
  21. Envelope envelope,
  22. AMQP.BasicProperties properties,
  23. byte[] body)
  24. throws IOException
  25. {
  26. // no work to do
  27. String msg = new String(body);
  28. System.out.println("消费者1:"+msg);
  29. channel.basicAck(envelope.getDeliveryTag(), false);
  30. }
  31. });
  32. }
  33. }

主题模式(Topic)

在这里插入图片描述

可以采取通配符模式:

  1. * (star) can substitute for exactly one word.
  2. # (hash) can substitute for zero or more words.

#`:匹配0个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

场景:

京东: 无锡,河南下订单。退货

==========总部 收到这个订单 .

无锡分销商也有这个消息 *.wuxi

在这里插入图片描述

生产者:主题交换机,带路由键

  1. package com.tianju.topic;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.MessageProperties;
  5. import com.tianju.config.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 生产者发送消息
  10. * 建立连接--> 创建频道 --> 创建队列 --> 发送消息
  11. */
  12. public class Provider {
  13. private static String EXCHANGE = "exchange_58";
  14. public static void main(String[] args) throws IOException, TimeoutException {
  15. // 建立连接
  16. Connection connection = ConnectionFactory.createConnection();
  17. // 创建频道
  18. Channel channel = connection.createChannel();
  19. // 创建交换机
  20. channel.exchangeDeclare(EXCHANGE, "topic"); // 主题模式交换机
  21. String msg = "this is wuxi order";
  22. channel.basicPublish(EXCHANGE,"order.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());
  23. msg = "this is henan order";
  24. channel.basicPublish(EXCHANGE,"order.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());
  25. msg = "this is wuxi back";
  26. channel.basicPublish(EXCHANGE,"back.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());
  27. msg = "this is henan back";
  28. channel.basicPublish(EXCHANGE,"back.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());
  29. }
  30. }

消费者1:通配符的路由键

  1. package com.tianju.topic;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer {
  7. private static String EXCHANGE = "exchange_58";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 建立连接
  10. Connection connection = ConnectionFactory.createConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. // 创建队列
  14. // 创建交换机
  15. channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机
  16. // 队列必须声明,如果不存在,则自动创建
  17. channel.queueDeclare("q58", false, false, false, null);
  18. channel.queueBind("q58", EXCHANGE, "*.wuxi"); // 意味着无锡的订单和退货都能收到
  19. channel.basicConsume("q58", new DefaultConsumer(channel){
  20. public void handleDelivery(String consumerTag,
  21. Envelope envelope,
  22. AMQP.BasicProperties properties,
  23. byte[] body)
  24. throws IOException
  25. {
  26. // no work to do
  27. String msg = new String(body);
  28. System.out.println("无锡仓库:"+msg);
  29. channel.basicAck(envelope.getDeliveryTag(), false);
  30. }
  31. });
  32. }
  33. }

消费者2:统配符

  1. package com.tianju.topic;
  2. import com.rabbitmq.client.*;
  3. import com.tianju.config.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer2 {
  7. private static String EXCHANGE = "exchange_58";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 建立连接
  10. Connection connection = ConnectionFactory.createConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. // 创建队列
  14. // 创建交换机
  15. channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机
  16. // 队列必须声明,如果不存在,则自动创建
  17. channel.queueDeclare("q581", false, false, false, null);
  18. channel.queueBind("q581", EXCHANGE, "*.*"); // 意味着所有的订单和退货都能收到
  19. channel.basicConsume("q581", new DefaultConsumer(channel){
  20. public void handleDelivery(String consumerTag,
  21. Envelope envelope,
  22. AMQP.BasicProperties properties,
  23. byte[] body)
  24. throws IOException
  25. {
  26. // no work to do
  27. String msg = new String(body);
  28. System.out.println("总部仓库:"+msg);
  29. channel.basicAck(envelope.getDeliveryTag(), false);
  30. }
  31. });
  32. }
  33. }

总结

1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,15人围观)

还没有评论,来说两句吧...

相关阅读