RabbitMQ(二)实践篇 动手理解RabbitMQ五种模式 两小时搞定Rabbit应用 摸清路由器的的路由、广播、Topic 类型 了解Comfirm Return的过程
上一章RabbitMq (一)理论篇部分,通过画图的的方式,我觉得比较易懂的方式,介绍了一下整体,本章是用代码实现几个模式, 例程我已经上传git,其中basic的例程时本章的案例。有兴趣的回头也可以阅读高级特性部分的简单介绍。RabbitMQ (三)高级特性安装的部分,大家自行百度,傻瓜式安装。回头我也会把需要的安装包上传。下载不到的朋友直接拿也是可以。
代码git链接
文章目录
- 前言
- 准备工作
- 安装
- 启动登录
- 创建虚拟机
- 创建用户
- 配置用户给虚拟机
- 准备一个连接工具类,和常量定义
- 简单模式
- 生产者
- 消费者
- WorkQueue
- 信息类
- 生产者
- 消费者
- 发布订阅
- 生产者
- 新浪消费者
- 百度消费者
- 路由模式
- 生产者
- 百度消费者
- 新浪消费者
- Topic
- 生产者
- 百度消费者
- 新浪消费者
- Comfirm和Return
- 生产者
前言
这还有什么前言啊,没有,直接开干!
准备工作
安装
找到一个安装步骤,没几步,大佬们自行安装吧。
启动登录
安装好,启动,关闭防火墙拿到ip加上端口15672就可以访问UI界面了。
账号密码都是guest
创建虚拟机
我创建的虚拟机是antry_vm
创建用户
我创建的用户是antry
注意tags填administrator
这里的tags,定义权限用,具体的看链接,目前也可以不用知道。用户权限
配置用户给虚拟机
我在antry_vm给antry设置权限
准备工作完成,撸代码了。
创建一个maven工程,引入包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
准备一个连接工具类,和常量定义
这个不能用注释了吧,参数名称这么人性化,和我前面创建的虚拟机,用户名一一对上就好了。改成大佬你自己的参数就行了。
/** * 连接工具 */
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("192.168.200.131");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("antry");
connectionFactory.setPassword("antry");
connectionFactory.setVirtualHost("antry_vm");
}
public static Connection getConnection(){
Connection conn = null;
try {
conn = connectionFactory.newConnection();
return conn;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
定义一些一会儿会用到的常量,目的呢,有时候多处用到,如果不小心打错了一点,就会出问题,所以定义成常量,容错率高点,编辑器能提示。
/** * 参数参量 */
public class RabbitConstant {
public static final String QUEUE_HELLOWORLD = "helloword";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
简单模式
生产者
生产者获取连接-创建通道-声明队列-发送消息
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取TCP长连接
Connection conn = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
String message = "antry";
//四个参数
//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes());
channel.close();
conn.close();
System.out.println("===发送成功===");
}
}
可以跑起来瞧瞧了
显示发送成功
此时通过antry用户登录,查看队列,就可以看到创建了一个helloword的队列,不好意思,单词写错,将就吧。
这里有一条消息,未被读取。
消费者
同样是获取连接,创建通道(其实也可以不用创建了,因为Producer已经创建过了,存在不会再创建,所以问题不大),创建队列,创建消费者,消费者的最后一个参数是实现Customer借口的实现类,我们可以选择实现这个接口,也可以用适配器DefaultCustomer。本次案例使用的是适配器,适配器自动挡用起来爽。这里面手动编辑监听到消息要做什么。
public class Customer {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection conn = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);
//从MQ服务器中获取数据
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));
}
private static class Reciver extends DefaultConsumer {
private Channel channel;
//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
消费完了之后我们在ui界面上也可以看到空空如也,都被消费了。
WorkQueue
这个模式是多个消费者一起消费同一个队列中的任务。因此我假设有一个要发100条信息的任务,有三个消费者都能发信息。
信息类
public class SMS {
/** * 接收人姓名 */
private String name;
/** * 接收人电话 */
private String mobile;
/** * 短信内容 */
private String content;
public SMS(String name, String mobile, String content) {
this.name = name;
this.mobile = mobile;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
生产者
因为这里发送对象,需要json转换,所以引入了一个包
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
生产者向MQ发送100条任务
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for(int i = 1 ; i <= 100 ; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
}
System.out.println("发送数据成功");
channel.close();
connection.close();
}
}
消费者
可以创建多个消费者,代码都一样,为了区分,我建了三个,分别为SMSSender1、SMSSender2、SMSSender3
public class SMSSender1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
先启动三个消费者,然后启动生产者。
发布订阅
生产者
注意这里发送消息的时候,选择了交换机,而不是像前面两种模式直接选择队列。前面两个模式并不是没有选择交换机,只是使用了默认的交换机。
public class WeatherBurean {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
//第一个参数交换机名字 其他参数和之前的一样
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
channel.close();
connection.close();
}
}
既然这里用到了交换机,因此就需要在新增要给交换机。
选择自己的交换机,填上交换机名称,选择fanout(广播)模式
新浪消费者
public class Sina {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
百度消费者
public class BaiDu {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
这两个消费者,和前面的模式不同之处在于,这里申明完队列之后,将队列和交换机进行绑定,也就是绑定到天气的交换机去获取天气信息
先运行新浪和百度的消费者,然后运行生产者,输入信息123
可以分别在xinna和baidu看到获取到的信息
路由模式
生产者
这次发送,不仅有交换机,第二个参数作为routingkey,为什么这里的第二个参数被认为是routingkey呢?
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//第一个参数交换机名字 第二个参数作为 消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
因为我们在创建这个交换机的失火,我们选择的是direct定向的类型,就会把这个参数当作routingkey
百度消费者
这里给百度绑定了两个routingkey,分别是china.hunan.changsha.20201127和china.hebei.shijiazhuang.20201128
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
新浪消费者
这里给新浪绑定了四个routingkey
public class Sina {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
//指定队列与交换机以及routing key之间的关系
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
运行之后,可以看到新浪和百度都收到了符合各自routingkey的消息
在UI中也可以看到交换机通过routingkey绑定了对应的queue,这里我们也可以看到,key没有绑定的信息,发送给交换机直接就丢失了。也证明了交换机没有存储的功能。
Topic
生产者
可以看出生产者没有什么变化
public class WeaherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//第一个参数交换机名字 第二个参数作为 消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
只是在新建交换机的时候要选择topic类型
百度消费者
可以看到消费者的变化,只是roukingkey绑定时,用了通配符。
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
// channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
新浪消费者
public class Sina {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
//指定队列与交换机以及routing key之间的关系
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
运行之后可以看到各自都拿到对应匹配的信息
Comfirm和Return
生产者
因为我们前面都跑过了消费者,这个地方可以不需要新建消费者。因为Comfirm和Return时发生在Producer和Broker之间的
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Map area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//开启confirm监听模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
//第二个参数代表接收的数据是否为批量接收,一般我们用不到。
System.out.println("消息已被Broker接收,Tag:" + l );
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息已被Broker拒收,Tag:" + l);
}
});
channel.addReturnListener(new ReturnCallback() {
public void handle(Return r) {
System.err.println("===========================");
System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
System.err.println("Return主题:" + new String(r.getBody()));
System.err.println("===========================");
}
});
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//Routing key 第二个参数相当于数据筛选的条件
//第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() ,true, null , me.getValue().getBytes());
}
//如果关闭则无法进行监听,因此此处不需要关闭
/*channel.close(); connection.close();*/
}
}
先运行一下
从结果可以看到,这三个被return就是我们前面发现发送出,而丢失不见的三条消息。8条消息都被Broker接收到了,只是有三条找不到对应的队列进行投递,因此进行Return了,可以再创建几个队列来绑定这个几个routingkey,随后就会发现,所有的消息都被接收了。
还没有评论,来说两句吧...