愿隙 发表于 2025-10-24 00:40:00

RocketMQ+Spring Boot的简单实现及其深入分析

Producer搭建


[*]导入RocketMQ依赖和配置RocketMQ地址及producer的group:name
      <dependency>
            <groupId>org.apache.rocketmq</groupId>
            rocketmq-spring-boot-starter</artifactId>
            <version>2.3.4</version>
      </dependency>

[*]创建消费接口

1. 调用接口进行测试
    ## 发送消息模式的类型扩展
   
    > `RocketTemplate`中有许多发送方法,其可应对大多数的场景
    >
    ### syncSend()
   
    > 同步发送,仅当发送过程完成时返回此方法.需严格保证顺序性,其会阻塞调用线程至到Broker获取响应
    >
    - 参数列表
      - `destination`目标主题,格式为`topicName:tags`tags可选
      - `payload`消息体,可以是任意对象,自动序列化
      - `message`Spring Message对象,可自定义headers
      - `timeout`发送超时时间毫秒,默认3000ms
    - 返回对象
      - SendResult:包含消息ID,发送状态,队列偏移值等等
    - 用于大部分对发送结果严格的场景:如电商,金融等等
   
    ### asyncSend()
   
    > 异步发送,没有返回对象.异步传输一般用于响应时间敏感的业务场景.在发送完成会立即调用其参数列表中的sendCallBack方法
    >
    - 参数列表
      - `String destination`
      - `Message<?> message`
      - `SendCallback sendCallback`:发送结果调用方法
            - `onSuccess(SendResult result)`:发送成功回调
            - `onException(Throwable e)`:发送失败回调
    - 适用于:高吞吐,但对结果要求不高的场景如日志采集等等
    ### syncSendOrderly()
   
    > 顺序发送
    >
    - 参数列表
      - `SendResult syncSendOrderly(String destination, Message<?> message, String hashKey);`
      - `SendResult syncSendOrderly(String destination, Object payload, String hashKey);`
      - `hasyKey`:分片见,相同的hashKey会被路由到同一个队列
            - 基本原理:`int queueId = Math.abs(hashKey.hashCode()) % queueCount;`
   
    # SendMesssageInTransaction()
   
    > 发送MQ分布式事务消息,其采用2PC(两端式协议)+补偿机制(事务回查)的分布式事务功能
    >
    - 半事务消息:暂不能投递的消息,消息生产者已经成功将消息发送到RocketMQ服务器中,但暂时为收到生产者对消息的二次确认.此时的消息会被标记为”暂不能投递”的状态.处于这种”暂不能投递”状态的消息被称为半事务消息
    - 消息回查:由于一些网络问题,生产者自身的问题等等,导致某条事务消息二次丢失,RocketMQ通过扫描某条消息长期处于”半事务消息”时,其会向生产者组询查该消息的最终状态(commit或Rollback),这就是消息回查
    - 在RocketMQ中发送食物消息需要三个核心组件
      1. 事务消息发送:使用sendMessageInTransaction()方法
      2. 事务监听器:实现RocketMQLocalTransactionListener接口
      3. 事务监听注册:通过@RocketMQTransactionListener注解注册
            - 返回对象:`TransactionSendResult`:含事务状态`LocalTransactionState`
      - 采用这一套事务消息发送逻辑,本地的Service只需关心发送消息的逻辑,其余的事务逻辑交由给事务监听器处理
    ### 事务基本执行流程
   
    - **第一阶段:发送半事务**
      1. 生产者发送半事务消息:生产者将业务数据封装成数据,并将其发送给RocketMQ,此时消息被标记为”半事务消息”
      2. RocketMQ确认接收消息:RocketMQ接收到消息并将其持久化到存储系统中,此时会向生产者发送一个确认消息(Ack)表示该消息已经被接收
      3. 生产者执行本地事务逻辑:生产者接收到服务端的确认后,则开始本地业务逻辑执行.如更新数据库,修改订单等等
    - **第二阶段:提交或回滚事务**
      1. 生产者提交二次确认结果:根据本地事务执行结果,生产者向RocketMQ提交二次确认结果
            1. 若本地事务执行成功:生产者提交`Commit`操作,服务器端将半事务标记为:”可投递状态”,并将其投递给消费者
            2. 如果本地事务执行失败:生产者提交`Rollback`操作,RocketMQ则会回滚,不会将消息投递给消费者
      2. 但0由于网络问题生产者自身应用问题导致重启,RocketMQ迟迟未收到生产者的二次确认,或收到的消息结果为`Unknown`未知状态.RocketMQ会发起事务回查.
            1. RocketMQ会向生产者发送回查请求,要求查询其本地事务状态
            2. 生产者根据本地事务状态再次提交二次确认结果
    - **第三阶段:消费者进行消费**
      1. 当RocketMQ中的消息被标记为”可投递”之后,消息会被投递到消费者.消费者按其消费逻辑进行消费操作.最后向RocketMQ发送消费结果(成功/失败)
      2. 消息被消费后,RocketMQ会标记其消息为”已消费”,RocketMQ会默认保留所有消息.支持消费者回溯历史消息
    ### 幂等问题
   
    > 幂等性,值对同一操作多次执行,结果与仅执行一次效果相同
    >
    - 出现幂等的原因:
      1. **生产者重复发送**:生产者客户端有可能因为某些网络问题导致发送失败,届时生产者会尝试发送相同的消息从而会导致RocketMQ重复消费
      2. **重试机制**:RocketMQ提供了消息重试机制,在消息发送中出现异常时.消费者会重新拉取相同的消息进行重试.若消费者方没有处理幂等性,则消息会被重复消费
      3. **集群下的消息重复消费**:在RocketMQ下的集群,如果多个消费者订阅相同的主题,且每个消费者都独立消费消息,那么同一个消息就会被不同的消费者组重复消费
   
    ### 使用Redssion实现幂等性
   
    ```java
   consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                  for (MessageExt msg : msgs) {
                        String msgId = msg.getMsgId();
                        String lockKey = "rocketmq:msg:" + msgId;
                        RLock lock = redissonClient.getLock(lockKey);
                        boolean acquired = false;
   
                        try {
                            acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);
                            if (acquired) {
                              System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);
                              Thread.sleep(100); // 模拟业务处理
                              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            } else {
                              System.out.println("Duplicate message skipped: " + msgId);
                              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                        } catch (Exception e) {
                            System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        } finally {
                            if (acquired) {
                              lock.unlock();
                            }
                        }
                  }
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    ```
   
    ```java
    consumer.registerMessageListener(new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt msg : msgs) {
                String msgId = msg.getMsgId();
                String lockKey = "rocketmq:msg:" + msgId;
                RLock lock = redissonClient.getLock(lockKey);
                boolean acquired = false;
   
                try {
                  acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);
                  if (acquired) {
                        System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);
                        Thread.sleep(100);
                        return ConsumeOrderlyStatus.SUCCESS;
                  } else {
                        System.out.println("Duplicate message skipped: " + msgId);
                        return ConsumeOrderlyStatus.SUCCESS;
                  }
                } catch (Exception e) {
                  System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());
                  return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                } finally {
                  if (acquired) {
                        lock.unlock();
                  }
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
      }
    });
    ```
   
    ### sendAndReceive()
   
    - 用于实现请求-响应模式的核心方法,其允许在分布式系统中实现类似RCP同步通信的能力
    - 核心特性
      - 同步通信:阻塞调用线程直到收到响应
      - 双向交互:实现生产者与消费者的双向通信
      - 解耦设计:保持MQ解耦特性同时实现同步交互
    - 参数列表
      
      Message<?> sendAndReceive(`String destination,Message<?> requestMessage,long timeout`) throws MessagingException
      
    - 业务场景:实时查询库存信息Consumer搭建


[*]引入依赖配置consumer的group:name
      <dependency>
            <groupId>org.apache.rocketmq</groupId>
            rocketmq-spring-boot-starter</artifactId>
            <version>2.3.4</version>
      </dependency>

[*]创建消息监听器
实现RocketMQListener接口,重写其onMessage()方法完成消费逻辑


[*]使用@RocketMQMessageListener(consumerGroup=””,topic=””)注解:来指定消费者组,及目标topic


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

慷规扣 发表于 2025-11-14 00:57:18

感谢,下载保存了

甘子萱 发表于 2025-11-21 19:01:41

分享、互助 让互联网精神温暖你我

毋献仪 发表于 2025-12-10 14:23:49

前排留名,哈哈哈
页: [1]
查看完整版本: RocketMQ+Spring Boot的简单实现及其深入分析