RabbitMQ(二)实践篇 动手理解RabbitMQ五种模式 两小时搞定Rabbit应用 摸清路由器的的路由、广播、Topic 类型 了解Comfirm Return的过程

青旅半醒 2022-09-09 12:41 21阅读 0赞

上一章RabbitMq (一)理论篇部分,通过画图的的方式,我觉得比较易懂的方式,介绍了一下整体,本章是用代码实现几个模式, 例程我已经上传git,其中basic的例程时本章的案例。有兴趣的回头也可以阅读高级特性部分的简单介绍。RabbitMQ (三)高级特性安装的部分,大家自行百度,傻瓜式安装。回头我也会把需要的安装包上传。下载不到的朋友直接拿也是可以。

代码git链接

文章目录

  • 前言
  • 准备工作
    • 安装
    • 启动登录
    • 创建虚拟机
    • 创建用户
    • 配置用户给虚拟机
    • 准备一个连接工具类,和常量定义
  • 简单模式
    • 生产者
    • 消费者
  • WorkQueue
    • 信息类
    • 生产者
    • 消费者
  • 发布订阅
    • 生产者
    • 新浪消费者
    • 百度消费者
  • 路由模式
    • 生产者
    • 百度消费者
    • 新浪消费者
  • Topic
    • 生产者
    • 百度消费者
    • 新浪消费者
  • Comfirm和Return
    • 生产者

前言

这还有什么前言啊,没有,直接开干!


准备工作

安装

找到一个安装步骤,没几步,大佬们自行安装吧。
在这里插入图片描述

启动登录

安装好,启动,关闭防火墙拿到ip加上端口15672就可以访问UI界面了。

账号密码都是guest

创建虚拟机

我创建的虚拟机是antry_vm
在这里插入图片描述

创建用户

我创建的用户是antry
注意tags填administrator
c90dc91048fd414daef7e1eb154227a3.png
这里的tags,定义权限用,具体的看链接,目前也可以不用知道。用户权限
在这里插入图片描述

配置用户给虚拟机

我在antry_vm给antry设置权限
在这里插入图片描述
准备工作完成,撸代码了。
创建一个maven工程,引入包

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.3.0</version>
  5. </dependency>

准备一个连接工具类,和常量定义

这个不能用注释了吧,参数名称这么人性化,和我前面创建的虚拟机,用户名一一对上就好了。改成大佬你自己的参数就行了。

  1. /** * 连接工具 */
  2. public class RabbitUtils {
  3. private static ConnectionFactory connectionFactory = new ConnectionFactory();
  4. static {
  5. connectionFactory.setHost("192.168.200.131");
  6. connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
  7. connectionFactory.setUsername("antry");
  8. connectionFactory.setPassword("antry");
  9. connectionFactory.setVirtualHost("antry_vm");
  10. }
  11. public static Connection getConnection(){
  12. Connection conn = null;
  13. try {
  14. conn = connectionFactory.newConnection();
  15. return conn;
  16. } catch (Exception e) {
  17. throw new RuntimeException(e);
  18. }
  19. }
  20. }

定义一些一会儿会用到的常量,目的呢,有时候多处用到,如果不小心打错了一点,就会出问题,所以定义成常量,容错率高点,编辑器能提示。

  1. /** * 参数参量 */
  2. public class RabbitConstant {
  3. public static final String QUEUE_HELLOWORLD = "helloword";
  4. public static final String QUEUE_SMS = "sms";
  5. public static final String EXCHANGE_WEATHER = "weather";
  6. public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
  7. public static final String QUEUE_BAIDU = "baidu";
  8. public static final String QUEUE_SINA = "sina";
  9. public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
  10. }

简单模式

生产者

生产者获取连接-创建通道-声明队列-发送消息

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //获取TCP长连接
  4. Connection conn = RabbitUtils.getConnection();
  5. //创建通信“通道”,相当于TCP中的虚拟连接
  6. Channel channel = conn.createChannel();
  7. //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
  8. //第一个参数:队列名称ID
  9. //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
  10. //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
  11. //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
  12. //其他额外的参数, null
  13. channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
  14. String message = "antry";
  15. //四个参数
  16. //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
  17. //队列名称
  18. //额外的设置属性
  19. //最后一个参数是要传递的消息字节数组
  20. channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes());
  21. channel.close();
  22. conn.close();
  23. System.out.println("===发送成功===");
  24. }
  25. }

可以跑起来瞧瞧了
在这里插入图片描述

显示发送成功
此时通过antry用户登录,查看队列,就可以看到创建了一个helloword的队列,不好意思,单词写错,将就吧。
这里有一条消息,未被读取。
在这里插入图片描述

消费者

同样是获取连接,创建通道(其实也可以不用创建了,因为Producer已经创建过了,存在不会再创建,所以问题不大),创建队列,创建消费者,消费者的最后一个参数是实现Customer借口的实现类,我们可以选择实现这个接口,也可以用适配器DefaultCustomer。本次案例使用的是适配器,适配器自动挡用起来爽。这里面手动编辑监听到消息要做什么。

  1. public class Customer {
  2. public static void main(String[] args) throws IOException {
  3. //获取TCP长连接
  4. Connection conn = RabbitUtils.getConnection();
  5. //创建通信“通道”,相当于TCP中的虚拟连接
  6. Channel channel = conn.createChannel();
  7. //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
  8. //第一个参数:队列名称ID
  9. //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
  10. //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
  11. //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
  12. //其他额外的参数, null
  13. channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
  14. //从MQ服务器中获取数据
  15. //创建一个消息消费者
  16. //第一个参数:队列名
  17. //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
  18. //第三个参数要传入DefaultConsumer的实现类
  19. channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));
  20. }
  21. private static class Reciver extends DefaultConsumer {
  22. private Channel channel;
  23. //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
  24. public Reciver(Channel channel) {
  25. super(channel);
  26. this.channel = channel;
  27. }
  28. @Override
  29. public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
  30. String message = new String(body);
  31. System.out.println("消费者接收到的消息:"+message);
  32. System.out.println("消息的TagId:"+envelope.getDeliveryTag());
  33. //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
  34. channel.basicAck(envelope.getDeliveryTag(), false);
  35. }
  36. }
  37. }

在这里插入图片描述
消费完了之后我们在ui界面上也可以看到空空如也,都被消费了。
在这里插入图片描述

WorkQueue

这个模式是多个消费者一起消费同一个队列中的任务。因此我假设有一个要发100条信息的任务,有三个消费者都能发信息。

信息类

  1. public class SMS {
  2. /** * 接收人姓名 */
  3. private String name;
  4. /** * 接收人电话 */
  5. private String mobile;
  6. /** * 短信内容 */
  7. private String content;
  8. public SMS(String name, String mobile, String content) {
  9. this.name = name;
  10. this.mobile = mobile;
  11. this.content = content;
  12. }
  13. public String getName() {
  14. return name;
  15. }
  16. public void setName(String name) {
  17. this.name = name;
  18. }
  19. public String getMobile() {
  20. return mobile;
  21. }
  22. public void setMobile(String mobile) {
  23. this.mobile = mobile;
  24. }
  25. public String getContent() {
  26. return content;
  27. }
  28. public void setContent(String content) {
  29. this.content = content;
  30. }
  31. }

生产者

因为这里发送对象,需要json转换,所以引入了一个包

  1. <dependency>
  2. <groupId>com.google.code.gson</groupId>
  3. <artifactId>gson</artifactId>
  4. <version>2.8.5</version>
  5. </dependency>

生产者向MQ发送100条任务

  1. public class OrderSystem {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = RabbitUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
  6. for(int i = 1 ; i <= 100 ; i++) {
  7. SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
  8. String jsonSMS = new Gson().toJson(sms);
  9. channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
  10. }
  11. System.out.println("发送数据成功");
  12. channel.close();
  13. connection.close();
  14. }
  15. }

消费者

可以创建多个消费者,代码都一样,为了区分,我建了三个,分别为SMSSender1、SMSSender2、SMSSender3

  1. public class SMSSender1 {
  2. public static void main(String[] args) throws IOException {
  3. Connection connection = RabbitUtils.getConnection();
  4. final Channel channel = connection.createChannel();
  5. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
  6. //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
  7. //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
  8. channel.basicQos(1);//处理完一个取一个
  9. channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
  10. @Override
  11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  12. String jsonSMS = new String(body);
  13. System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
  14. try {
  15. Thread.sleep(10);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. channel.basicAck(envelope.getDeliveryTag() , false);
  20. }
  21. });
  22. }
  23. }

先启动三个消费者,然后启动生产者。
在这里插入图片描述

发布订阅

生产者

注意这里发送消息的时候,选择了交换机,而不是像前面两种模式直接选择队列。前面两个模式并不是没有选择交换机,只是使用了默认的交换机。

  1. public class WeatherBurean {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = RabbitUtils.getConnection();
  4. String input = new Scanner(System.in).next();
  5. Channel channel = connection.createChannel();
  6. //第一个参数交换机名字 其他参数和之前的一样
  7. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
  8. channel.close();
  9. connection.close();
  10. }
  11. }

既然这里用到了交换机,因此就需要在新增要给交换机。
选择自己的交换机,填上交换机名称,选择fanout(广播)模式
在这里插入图片描述

新浪消费者

  1. public class Sina {
  2. public static void main(String[] args) throws IOException {
  3. //获取TCP长连接
  4. Connection connection = RabbitUtils.getConnection();
  5. //获取虚拟连接
  6. final Channel channel = connection.createChannel();
  7. //声明队列信息
  8. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
  9. //queueBind用于将队列与交换机绑定
  10. //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
  11. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
  12. channel.basicQos(1);
  13. channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
  14. @Override
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. System.out.println("新浪天气收到气象信息:" + new String(body));
  17. channel.basicAck(envelope.getDeliveryTag() , false);
  18. }
  19. });
  20. }
  21. }

百度消费者

  1. public class BaiDu {
  2. public static void main(String[] args) throws IOException {
  3. //获取TCP长连接
  4. Connection connection = RabbitUtils.getConnection();
  5. //获取虚拟连接
  6. final Channel channel = connection.createChannel();
  7. //声明队列信息
  8. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
  9. //queueBind用于将队列与交换机绑定
  10. //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
  11. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
  12. channel.basicQos(1);
  13. channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
  14. @Override
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. System.out.println("百度天气收到气象信息:" + new String(body));
  17. channel.basicAck(envelope.getDeliveryTag() , false);
  18. }
  19. });
  20. }
  21. }

这两个消费者,和前面的模式不同之处在于,这里申明完队列之后,将队列和交换机进行绑定,也就是绑定到天气的交换机去获取天气信息

先运行新浪和百度的消费者,然后运行生产者,输入信息123123

可以分别在xinna和baidu看到获取到的信息
在这里插入图片描述

路由模式

生产者

这次发送,不仅有交换机,第二个参数作为routingkey,为什么这里的第二个参数被认为是routingkey呢?

  1. public class WeatherBureau {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Map area = new LinkedHashMap<String, String>();
  4. area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
  5. area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
  6. area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");
  7. area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
  8. area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
  9. area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
  10. area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
  11. area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
  12. Connection connection = RabbitUtils.getConnection();
  13. Channel channel = connection.createChannel();
  14. Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
  15. while (itr.hasNext()) {
  16. Map.Entry<String, String> me = itr.next();
  17. //第一个参数交换机名字 第二个参数作为 消息的routing key
  18. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());
  19. }
  20. channel.close();
  21. connection.close();
  22. }
  23. }

因为我们在创建这个交换机的失火,我们选择的是direct定向的类型,就会把这个参数当作routingkey
在这里插入图片描述

百度消费者

这里给百度绑定了两个routingkey,分别是china.hunan.changsha.20201127和china.hebei.shijiazhuang.20201128

  1. public class Baidu {
  2. public static void main(String[] args) throws IOException {
  3. Connection connection = RabbitUtils.getConnection();
  4. final Channel channel = connection.createChannel();
  5. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
  6. //queueBind用于将队列与交换机绑定
  7. //参数1:队列名 参数2:交互机名 参数三:路由key
  8. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");
  9. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
  10. channel.basicQos(1);
  11. channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
  12. @Override
  13. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  14. System.out.println("百度天气收到气象信息:" + new String(body));
  15. channel.basicAck(envelope.getDeliveryTag() , false);
  16. }
  17. });
  18. }
  19. }

新浪消费者

这里给新浪绑定了四个routingkey

  1. public class Sina {
  2. public static void main(String[] args) throws IOException {
  3. //获取TCP长连接
  4. Connection connection = RabbitUtils.getConnection();
  5. //获取虚拟连接
  6. final Channel channel = connection.createChannel();
  7. //声明队列信息
  8. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
  9. //指定队列与交换机以及routing key之间的关系
  10. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
  11. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
  12. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");
  13. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012");
  14. channel.basicQos(1);
  15. channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
  16. @Override
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. System.out.println("新浪天气收到气象信息:" + new String(body));
  19. channel.basicAck(envelope.getDeliveryTag() , false);
  20. }
  21. });
  22. }
  23. }

运行之后,可以看到新浪和百度都收到了符合各自routingkey的消息
在这里插入图片描述
在这里插入图片描述

在UI中也可以看到交换机通过routingkey绑定了对应的queue,这里我们也可以看到,key没有绑定的信息,发送给交换机直接就丢失了。也证明了交换机没有存储的功能。
在这里插入图片描述

Topic

生产者

可以看出生产者没有什么变化

  1. public class WeaherBureau {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Map area = new LinkedHashMap<String, String>();
  4. area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
  5. area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
  6. area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
  7. area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
  8. area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
  9. area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
  10. area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
  11. area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
  12. Connection connection = RabbitUtils.getConnection();
  13. Channel channel = connection.createChannel();
  14. Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
  15. while (itr.hasNext()) {
  16. Map.Entry<String, String> me = itr.next();
  17. //第一个参数交换机名字 第二个参数作为 消息的routing key
  18. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
  19. }
  20. channel.close();
  21. connection.close();
  22. }
  23. }

只是在新建交换机的时候要选择topic类型
在这里插入图片描述

百度消费者

可以看到消费者的变化,只是roukingkey绑定时,用了通配符。

  1. public class Baidu {
  2. public static void main(String[] args) throws IOException {
  3. Connection connection = RabbitUtils.getConnection();
  4. final Channel channel = connection.createChannel();
  5. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
  6. //queueBind用于将队列与交换机绑定
  7. //参数1:队列名 参数2:交互机名 参数三:路由key
  8. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
  9. // channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
  10. channel.basicQos(1);
  11. channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
  12. @Override
  13. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  14. System.out.println("百度天气收到气象信息:" + new String(body));
  15. channel.basicAck(envelope.getDeliveryTag() , false);
  16. }
  17. });
  18. }
  19. }

新浪消费者

  1. public class Sina {
  2. public static void main(String[] args) throws IOException {
  3. //获取TCP长连接
  4. Connection connection = RabbitUtils.getConnection();
  5. //获取虚拟连接
  6. final Channel channel = connection.createChannel();
  7. //声明队列信息
  8. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
  9. //指定队列与交换机以及routing key之间的关系
  10. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
  11. channel.basicQos(1);
  12. channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
  13. @Override
  14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15. System.out.println("新浪天气收到气象信息:" + new String(body));
  16. channel.basicAck(envelope.getDeliveryTag() , false);
  17. }
  18. });
  19. }
  20. }

运行之后可以看到各自都拿到对应匹配的信息
在这里插入图片描述
在这里插入图片描述

Comfirm和Return

生产者

因为我们前面都跑过了消费者,这个地方可以不需要新建消费者。因为Comfirm和Return时发生在Producer和Broker之间的

  1. public class WeatherBureau {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Map area = new LinkedHashMap<String, String>();
  4. area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
  5. area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
  6. area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
  7. area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
  8. area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
  9. area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
  10. area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
  11. area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
  12. Connection connection = RabbitUtils.getConnection();
  13. Channel channel = connection.createChannel();
  14. //开启confirm监听模式
  15. channel.confirmSelect();
  16. channel.addConfirmListener(new ConfirmListener() {
  17. public void handleAck(long l, boolean b) throws IOException {
  18. //第二个参数代表接收的数据是否为批量接收,一般我们用不到。
  19. System.out.println("消息已被Broker接收,Tag:" + l );
  20. }
  21. public void handleNack(long l, boolean b) throws IOException {
  22. System.out.println("消息已被Broker拒收,Tag:" + l);
  23. }
  24. });
  25. channel.addReturnListener(new ReturnCallback() {
  26. public void handle(Return r) {
  27. System.err.println("===========================");
  28. System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
  29. System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
  30. System.err.println("Return主题:" + new String(r.getBody()));
  31. System.err.println("===========================");
  32. }
  33. });
  34. Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
  35. while (itr.hasNext()) {
  36. Map.Entry<String, String> me = itr.next();
  37. //Routing key 第二个参数相当于数据筛选的条件
  38. //第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
  39. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() ,true, null , me.getValue().getBytes());
  40. }
  41. //如果关闭则无法进行监听,因此此处不需要关闭
  42. /*channel.close(); connection.close();*/
  43. }
  44. }

先运行一下
在这里插入图片描述
从结果可以看到,这三个被return就是我们前面发现发送出,而丢失不见的三条消息。8条消息都被Broker接收到了,只是有三条找不到对应的队列进行投递,因此进行Return了,可以再创建几个队列来绑定这个几个routingkey,随后就会发现,所有的消息都被接收了。

发表评论

表情:
评论列表 (有 0 条评论,21人围观)

还没有评论,来说两句吧...

相关阅读