【RocketMQ】(一) 简单消息(可靠的同步,可靠的异步和单向传输)

拼搏现实的明天。 2023-07-25 11:14 130阅读 0赞

本篇文章将使用RocketMQ以三种方式发送消息:可靠的同步,可靠的异步和单向传输。并介绍下负载均衡模式与广播模式的区别:

(1)同步发送消息:

  1. ** 在重要的通知消息,SMS通知,SMS营销系统等广泛的场景中使用可靠的同步传输。**

生产者代码:

  1. /**
  2. * 发送同步消息
  3. * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  4. */
  5. public class SyncProducer {
  6. public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
  7. //1.创建消息生产者producer,并指定生产者组名
  8. DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");
  9. //2.指定NameServer地址
  10. defaultMQProducer.setNamesrvAddr("192.168.160.131:9876");
  11. //3.启动producer
  12. defaultMQProducer.start();
  13. //4.创建消息对象,指定主题topic、tag和消息体
  14. for (int i = 0; i < 5; i++) {
  15. /*
  16. * 参数一:消息主题Topic
  17. * 参数二:消息tag
  18. * 参数三:消息内容
  19. */
  20. Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
  21. //5.发送消息
  22. SendResult result = defaultMQProducer.send(message);
  23. System.out.println("result:"+result);
  24. //睡眠1秒
  25. TimeUnit.SECONDS.sleep(1);
  26. }
  27. //6.关闭生产者producer
  28. defaultMQProducer.shutdown();
  29. }
  30. }

(2)异步发送消息

  1. ** 异步传输通常用于对时间敏感的业务场景中。**

生产者代码:

  1. /**
  2. * 发送异步消息
  3. * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
  4. */
  5. @Slf4j
  6. public class AsyncProducer {
  7. public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
  8. //1.创建消息生产者producer,并指定生产者组名
  9. DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");
  10. //2.指定NameServer地址
  11. defaultMQProducer.setNamesrvAddr("192.168.160.131:9876");
  12. //3.启动producer
  13. defaultMQProducer.start();
  14. for (int i = 0; i < 10; i++) {
  15. //4.创建消息对象,指定主题topic、tag和消息体
  16. /*
  17. * 参数一:消息主题Topic
  18. * 参数二:消息tag
  19. * 参数三:消息内容
  20. */
  21. Message message = new Message("base", "tag2", ("hello world" + i).getBytes());
  22. //5.发送异步消息
  23. defaultMQProducer.send(message, new SendCallback() {
  24. //发送成功回调
  25. @Override
  26. public void onSuccess(SendResult sendResult) {
  27. log.debug("sendResult:{}", sendResult);
  28. }
  29. //发送失败回调
  30. @Override
  31. public void onException(Throwable throwable) {
  32. log.error("throwable:",throwable);
  33. }
  34. });
  35. TimeUnit.SECONDS.sleep(1);
  36. }
  37. //6.关闭生产者producer
  38. defaultMQProducer.shutdown();
  39. }
  40. }

(3)以单向模式发送消息

  1. **单向传输用于要求中等可靠性的情况,例如日志收集。**

生产者代码:

  1. /**
  2. * 单向发送
  3. * 这种方式主要用在不特别关心发送结果的场景,例如日志发送
  4. * @author 13871
  5. */
  6. @Slf4j
  7. public class OneWayProducer {
  8. public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
  9. DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");
  10. defaultMQProducer.setNamesrvAddr("192.168.160.131:9876");
  11. defaultMQProducer.start();
  12. for (int i = 0; i < 10; i++) {
  13. Message message = new Message("base", "tag3", ("单向消息" + i).getBytes());
  14. defaultMQProducer.sendOneway(message);
  15. // TimeUnit.SECONDS.sleep(1);
  16. }
  17. defaultMQProducer.shutdown();
  18. }
  19. }

消费者代码:(用于以上三种生产者消费)

  1. /**
  2. * 消费者
  3. */
  4. @Slf4j
  5. public class consumer {
  6. public static void main(String[] args) throws MQClientException {
  7. //1.创建消费者Consumer,指定组名
  8. DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
  9. //2.指定NameServer地址
  10. defaultMQPushConsumer.setNamesrvAddr("192.168.160.131:9876");
  11. //3.订阅主题topic和tag
  12. defaultMQPushConsumer.subscribe("base", "*");
  13. //消费者默认是负载均衡,设置成广播模式
  14. //defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
  15. //4.设置回调函数,处理消息
  16. defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
  17. //指定消息内容
  18. @Override
  19. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  20. for (MessageExt msg:list) {
  21. System.out.println("msg:"+new String(msg.getBody()));
  22. }
  23. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  24. }
  25. });
  26. //5.启动consumer
  27. defaultMQPushConsumer.start();
  28. }
  29. }

ps:消费者默认是负载均衡,可以设置成广播模式;

如果消费者是负载均衡的话:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM3NDk1Nzg2_size_16_color_FFFFFF_t_70

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM3NDk1Nzg2_size_16_color_FFFFFF_t_70 1

consumer1:消费者端

  1. msg:hello world0
  2. msg:hello world3
  3. msg:hello world4

consumer2:消费者端

  1. msg:hello world1
  2. msg:hello world2

也就是采用默认负载均衡模式的话,一个生产者端发送5条消息,两个消费者端去消费,是两者分摊,轮着来的,这时候一个消费3条,一个消费2条;

广播模式:

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就 没有消息被分摊消费的说法。 在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所 有的queue。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM3NDk1Nzg2_size_16_color_FFFFFF_t_70 2

以同步发送消息的生产者代码为例,启动生产者端,然后启动两个消费者端,查看控制台:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM3NDk1Nzg2_size_16_color_FFFFFF_t_70 3

生产者端,控制台:(发送5条消息)

  1. result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D7E3D0000, offsetMsgId=C0A8A08300002A9F0000000000019EE5, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=0], queueOffset=19]
  2. result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D82350001, offsetMsgId=C0A8A08300002A9F0000000000019FBD, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=1], queueOffset=20]
  3. result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D862D0002, offsetMsgId=C0A8A08300002A9F000000000001A095, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=2], queueOffset=31]
  4. result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D8A190003, offsetMsgId=C0A8A08300002A9F000000000001A16D, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=3], queueOffset=30]
  5. result:SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2469D8E040004, offsetMsgId=C0A8A08300002A9F000000000001A245, messageQueue=MessageQueue [topic=base, brokerName=localhost.localdomain, queueId=0], queueOffset=20]
  6. 17:05:31.676 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.160.131:10911] result: true
  7. 17:05:31.684 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.160.131:9876] result: true

消费者端,控制台:(两个消费者端,会消费同样的消息,均是5条)

  1. msg:hello world0
  2. msg:hello world1
  3. msg:hello world2
  4. msg:hello world3
  5. msg:hello world4

发表评论

表情:
评论列表 (有 0 条评论,130人围观)

还没有评论,来说两句吧...

相关阅读