【RocketMQ】(一) 简单消息(可靠的同步,可靠的异步和单向传输)
本篇文章将使用RocketMQ以三种方式发送消息:可靠的同步,可靠的异步和单向传输。并介绍下负载均衡模式与广播模式的区别:
(1)同步发送消息:
** 在重要的通知消息,SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。**
生产者代码:
/**
* 发送同步消息
* 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
*/
public class SyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//1.创建消息生产者producer,并指定生产者组名
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");
//2.指定NameServer地址
defaultMQProducer.setNamesrvAddr("192.168.160.131:9876");
//3.启动producer
defaultMQProducer.start();
//4.创建消息对象,指定主题topic、tag和消息体
for (int i = 0; i < 5; i++) {
/*
* 参数一:消息主题Topic
* 参数二:消息tag
* 参数三:消息内容
*/
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
//5.发送消息
SendResult result = defaultMQProducer.send(message);
System.out.println("result:"+result);
//睡眠1秒
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
defaultMQProducer.shutdown();
}
}
(2)异步发送消息
** 异步传输通常用于对时间敏感的业务场景中。**
生产者代码:
/**
* 发送异步消息
* 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
*/
@Slf4j
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
//1.创建消息生产者producer,并指定生产者组名
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");
//2.指定NameServer地址
defaultMQProducer.setNamesrvAddr("192.168.160.131:9876");
//3.启动producer
defaultMQProducer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题topic、tag和消息体
/*
* 参数一:消息主题Topic
* 参数二:消息tag
* 参数三:消息内容
*/
Message message = new Message("base", "tag2", ("hello world" + i).getBytes());
//5.发送异步消息
defaultMQProducer.send(message, new SendCallback() {
//发送成功回调
@Override
public void onSuccess(SendResult sendResult) {
log.debug("sendResult:{}", sendResult);
}
//发送失败回调
@Override
public void onException(Throwable throwable) {
log.error("throwable:",throwable);
}
});
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
defaultMQProducer.shutdown();
}
}
(3)以单向模式发送消息
**单向传输用于要求中等可靠性的情况,例如日志收集。**
生产者代码:
/**
* 单向发送
* 这种方式主要用在不特别关心发送结果的场景,例如日志发送
* @author 13871
*/
@Slf4j
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");
defaultMQProducer.setNamesrvAddr("192.168.160.131:9876");
defaultMQProducer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("base", "tag3", ("单向消息" + i).getBytes());
defaultMQProducer.sendOneway(message);
// TimeUnit.SECONDS.sleep(1);
}
defaultMQProducer.shutdown();
}
}
消费者代码:(用于以上三种生产者消费)
/**
* 消费者
*/
@Slf4j
public class consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,指定组名
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
//2.指定NameServer地址
defaultMQPushConsumer.setNamesrvAddr("192.168.160.131:9876");
//3.订阅主题topic和tag
defaultMQPushConsumer.subscribe("base", "*");
//消费者默认是负载均衡,设置成广播模式
//defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调函数,处理消息
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
//指定消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg:list) {
System.out.println("msg:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动consumer
defaultMQPushConsumer.start();
}
}
ps:消费者默认是负载均衡,可以设置成广播模式;
如果消费者是负载均衡的话:
consumer1:消费者端
msg:hello world0
msg:hello world3
msg:hello world4
consumer2:消费者端
msg:hello world1
msg:hello world2
也就是采用默认负载均衡模式的话,一个生产者端发送5条消息,两个消费者端去消费,是两者分摊,轮着来的,这时候一个消费3条,一个消费2条;
广播模式:
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就 没有消息被分摊消费的说法。 在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所 有的queue。
以同步发送消息的生产者代码为例,启动生产者端,然后启动两个消费者端,查看控制台:
生产者端,控制台:(发送5条消息)
result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D7E3D0000, offsetMsgId=C0A8A08300002A9F0000000000019EE5, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=0], queueOffset=19]
result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D82350001, offsetMsgId=C0A8A08300002A9F0000000000019FBD, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=1], queueOffset=20]
result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D862D0002, offsetMsgId=C0A8A08300002A9F000000000001A095, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=2], queueOffset=31]
result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D8A190003, offsetMsgId=C0A8A08300002A9F000000000001A16D, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=3], queueOffset=30]
result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D8E040004, offsetMsgId=C0A8A08300002A9F000000000001A245, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=0], queueOffset=20]
17:05:31.676 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.160.131:10911] result: true
17:05:31.684 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.160.131:9876] result: true
消费者端,控制台:(两个消费者端,会消费同样的消息,均是5条)
msg:hello world0
msg:hello world1
msg:hello world2
msg:hello world3
msg:hello world4
还没有评论,来说两句吧...