rabbitMQ最终一致性处理分布式事务简单demo

以你之姓@ 2023-07-12 14:50 42阅读 0赞
  1. demo地址:链接:https://pan.baidu.com/s/1kGaSCHlfhm6UvbPcRUYp4g
  2. 提取码:11hs
  3. 分布式事务产生的背景?
  4. 1.RPC通讯中每个服务都有自己独立的数据源,每个数据源都互不影响.
  5. 2.在单个项目中存在多个不同jdbc连接(多数据源)
  6. 如何基于我们的MQ解决我们的分布式事务的问题(最终一致性)
  7. 1.确保我们的生产者往我们的MQ投递消息一定要成功.(生产者消息确认机制confirm),实现重试.
  8. 2.确保我们的消费者能够消费成功(手动ack机制),如果消费失败情况下,MQ自动帮消费者重试.
  9. 3.确保我们的生产者第一事务先执行成功,如果执行失败采用补单队列.

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5nc2hlbmdxaWFuZzE2OA_size_16_color_FFFFFF_t_70

生产者

配置交换机 队列 绑定 和创建

  1. package com.zhang.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. public class OrderRabbitMQConfig {
  13. @Autowired
  14. RabbitAdmin rabbitAdmin;
  15. /**
  16. * 派单队列
  17. */
  18. public static final String ORDER_DIC_QUEUE = "order_dic_queue";
  19. /**
  20. * 补单对接
  21. */
  22. public static final String ORDER_CREATE_QUEUE = "order_create_queue";
  23. /**
  24. * 订单交换机
  25. */
  26. private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";
  27. /**
  28. * 定义派单队列
  29. */
  30. @Bean
  31. public Queue directOrderDicQueue() {
  32. return new Queue(ORDER_DIC_QUEUE);
  33. }
  34. /**
  35. * 定义补派单队列
  36. */
  37. @Bean
  38. public Queue directCreateOrderQueue() {
  39. return new Queue(ORDER_CREATE_QUEUE);
  40. }
  41. /**
  42. * 定义订单交换机
  43. */
  44. @Bean
  45. DirectExchange directOrderExchange() {
  46. return new DirectExchange(ORDER_EXCHANGE_NAME);
  47. }
  48. /**
  49. * 派单队列与交换机绑定
  50. */
  51. @Bean
  52. Binding bindingExchangeOrderDicQueue() {
  53. return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
  54. }
  55. /**
  56. * 补单队列与交换机绑定
  57. */
  58. @Bean
  59. Binding bindingExchangeCreateOrder() {
  60. return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
  61. }
  62. //创建初始化RabbitAdmin对象
  63. @Bean
  64. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  65. String string = connectionFactory.toString();
  66. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  67. // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
  68. rabbitAdmin.setAutoStartup(true);
  69. return rabbitAdmin;
  70. }
  71. //创建交换机和对列
  72. @Bean
  73. public void createExchangeQueue (){
  74. rabbitAdmin.declareExchange(directOrderExchange());
  75. rabbitAdmin.declareQueue(directOrderDicQueue());
  76. rabbitAdmin.declareQueue(directCreateOrderQueue());
  77. }
  78. }

发送订单消息

  1. @Component
  2. @Slf4j
  3. public class OrderProducer implements RabbitTemplate.ConfirmCallback{
  4. @Autowired
  5. private OrderMapper orderMapper;
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. @Transactional
  9. public String sendOrder() {
  10. // 1.先创建我们订单信息
  11. String orderId = System.currentTimeMillis() + "";
  12. OrderEntity orderEntity = createOrder(orderId);
  13. // 2.添加到我们的数据库中
  14. int result = orderMapper.addOrder(orderEntity);
  15. if (result <= 0) {
  16. return null;
  17. }
  18. // 3.订单数据库插入成功的情况下, 使用MQ异步发送派单信息
  19. String msgJson = JSONObject.toJSONString(orderEntity);
  20. System.out.println("<<<<<<<1线程名称是>>>>>:"+Thread.currentThread().getName());
  21. sendMsg(msgJson);
  22. int i=1/0;
  23. return orderId;
  24. }
  25. //发送消息
  26. public void sendMsg(String msgJson) {
  27. // 设置生产者消息确认机制
  28. this.rabbitTemplate.setMandatory(true);
  29. this.rabbitTemplate.setConfirmCallback(this);
  30. CorrelationData correlationData = new CorrelationData();
  31. correlationData.setId(msgJson);
  32. String orderExchange = "order_exchange_name"; //订单交换机
  33. String orderRoutingKey = "orderRoutingKey"; //订单路由key
  34. rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msgJson, correlationData);
  35. }
  36. //创建订单信息
  37. public OrderEntity createOrder(String orderId) {
  38. OrderEntity orderEntity = new OrderEntity();
  39. orderEntity.setName("每特教育第六期平均就业薪资破10万");
  40. orderEntity.setOrderCreatetime(new Date());
  41. // 价格是300元
  42. orderEntity.setOrderMoney(300d);
  43. // 状态为 未支付
  44. orderEntity.setOrderState(0);
  45. Long commodityId = 30L;
  46. // 商品id
  47. orderEntity.setCommodityId(commodityId);
  48. orderEntity.setOrderId(orderId);
  49. return orderEntity;
  50. }
  51. /**
  52. * 实现ConfirmCallback接口 重写confirm方法
  53. * @param correlationData 投递失败回调消息
  54. * @param ack true 投递到MQ成功 false投递消息失败
  55. * @param cause
  56. */
  57. @Override
  58. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  59. String msg = correlationData.getId();
  60. if(!ack){
  61. log.info("<<<往MQ投递消息失败>>>>: {}" , msg);
  62. //采用递归重试
  63. sendMsg(msg);
  64. return;
  65. }
  66. log.info("<<<往MQ投递消息成功>>>>: {}" , msg);
  67. // 生产者投递多次还是is的情况下应该 人工记录
  68. }
  69. }

消费者

派单消费者

  1. @Component
  2. @Slf4j
  3. public class DispatchConsumer {
  4. @Autowired
  5. private DispatchMapper dispatchMapper;
  6. @RabbitListener(queues = "order_dic_queue")
  7. public void dispatchConsumer(Message message , Channel channel) throws IOException {
  8. // 1.获取消息
  9. String msg = new String(message.getBody());
  10. // 2.转换json
  11. JSONObject jsonObject = JSONObject.parseObject(msg);
  12. String orderId = jsonObject.getString("orderId");
  13. //主动根据orderId查询派单是否已经派单过,如果派单,不走下面插入代码
  14. // 计算分配的快递员id
  15. DispatchEntity dispatchEntity = new DispatchEntity(orderId, 1234L);
  16. // 3.插入我们的数据库
  17. int result = dispatchMapper.insertDistribute(dispatchEntity);
  18. if (result > 0) {
  19. // 手动将该消息删除
  20. // 手动ack 删除该消息
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  22. log.info("<<<消费者派单消费成功>>> 时间: {} " ,new Date());
  23. }
  24. }
  25. }

补单消费者

  1. @Component
  2. @Slf4j
  3. public class OrderConsumer {
  4. @Autowired
  5. private OrderMapper orderMapper;
  6. /***
  7. * 补单的消费不应该和 订单生产者放到一个服务器节点
  8. * 补单消费者如果不存在的情况下 队列缓存补单消息
  9. * 补偿分布式事务解决框架 思想最终一致性
  10. */
  11. @RabbitListener(queues = "order_create_queue")
  12. public void dispatchConsumer(Message message , Channel channel) throws IOException {
  13. // 1.获取消息
  14. String msg = new String(message.getBody());
  15. // 2.转换json
  16. OrderEntity orderEntity = JSONObject.parseObject(msg,OrderEntity.class);
  17. String orderId = orderEntity.getOrderId();
  18. OrderEntity result = orderMapper.findOrderId(orderId);
  19. if (null!=result ) {
  20. // 手动将该消息删除
  21. // 手动ack 删除该消息
  22. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  23. log.info("<<<消费者消费成功>>> 时间: {}",new Date());
  24. return;
  25. }
  26. //补单
  27. int i = orderMapper.addOrder(orderEntity);
  28. if(i>0){
  29. // 手动将该消息删除
  30. // 手动ack 删除该消息
  31. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  32. log.info("<<<消费者补单消费成功>>> 时间: {}",new Date());
  33. }
  34. }
  35. }

发表评论

表情:
评论列表 (有 0 条评论,42人围观)

还没有评论,来说两句吧...

相关阅读