在分布式系统的事务难题中,“最终一致性” 是很多场景的务实选择。今天聊聊 基于 RocketMQ 可靠消息的最终一致性方案 —— 它把本地消息表的逻辑封装到 MQ 内部,用 RocketMQ 的事务消息能力,优雅解决跨服务数据同步问题。

一、核心逻辑:MQ 化身事务协调者

RocketMQ 4.3+ 版本支持 事务消息,核心思路是:
让 MQ 作为 “事务协调者”,保证 “Producer 本地事务” 和 “MQ 消息投递” 的原子性。即便 Producer 或网络出问题,也能通过 事务回查 机制,保证最终一致。

以 “注册送积分” 为例(用户服务新增用户,积分服务加积分),流程分 5 步:

  1. 发半消息(Prepared 状态):Producer(用户服务)发一条 “半消息” 到 MQ,此时消费者(积分服务)无法消费
  2. 执行本地事务:Producer 执行本地业务(新增用户),用数据库事务保证操作原子性。
  3. 提交 / 回滚消息:本地事务成功 → 发 commit 指令,MQ 标记消息为 “可消费”;失败 → 发 rollback 指令,MQ 删除消息。
  4. 消费者消费消息:MQ 把可消费的消息投递给消费者(积分服务),消费者执行 “加积分”,并通过 ACK 确认。
  5. 事务回查(兜底):若 Producer 超时未发 commit/rollback,MQ 会主动回查 Producer 的事务状态,决定消息 fate。

二、流程拆解:从注册到积分到账

(一)阶段 1:发送半消息(Prepared 状态)

Producer(用户服务)先给 MQ 发一条 “半消息”,内容是 “用户注册成功,需加积分”。此时 MQ 会把消息标记为 Prepared(预提交),消费者无法订阅到这条消息。

代码示例(Java + RocketMQ)

// 1. 创建事务消息
Message message = new Message(
    "积分主题", 
    "tag_register", 
    "userId:123, points:10".getBytes()
);

// 2. 发送半消息,返回事务 ID
SendResult sendResult = producer.sendMessageInTransaction(
    message, 
    new LocalTransactionExecuter() {
        // 本地事务执行逻辑(后续阶段 2 调用)
        @Override
        public LocalTransactionState execute(Message msg, Object arg) {
            // 这里先返回 UNKNOW,等阶段 2 再处理
            return LocalTransactionState.UNKNOW; 
        }
    }, 
    null // 额外参数
);

这一步的关键:半消息是 “占位符”,确保 MQ 先确认消息可达,再执行本地事务。

(二)阶段 2:执行本地事务 + 提交 / 回滚

Producer 收到 MQ 的 “半消息发送成功” 响应后,执行本地事务(新增用户),并根据结果通知 MQ 是 commit 还是 rollback。

本地事务执行逻辑(补充阶段 1 的代码)

// 实现 LocalTransactionExecuter,处理本地事务
public LocalTransactionState execute(Message msg, Object arg) {
    try {
        // 执行本地事务:新增用户
        userMapper.insertUser(userId); 
        // 本地事务成功,返回 COMMIT:通知 MQ 提交消息
        return LocalTransactionState.COMMIT_MESSAGE; 
    } catch (Exception e) {
        // 本地事务失败,返回 ROLLBACK:通知 MQ 回滚消息
        return LocalTransactionState.ROLLBACK_MESSAGE; 
    }
}

这一步的关键:本地事务与 MQ 消息状态强关联。本地事务成功,MQ 才会把消息标记为 “可消费”;失败则 MQ 丢弃消息。

(三)阶段 3:消费者消费消息(积分服务)

MQ 收到 commit 指令后,把消息标记为 “可消费”,投递给积分服务。积分服务消费消息时,需保证 幂等性(防止 MQ 重试导致重复加积分)。

积分服务消费逻辑

// 监听 MQ 消息
@RocketMQMessageListener(
    topic = "积分主题", 
    consumerGroup = "积分消费组"
)
public class PointConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt msg) {
        // 解析消息:userId、points
        String content = new String(msg.getBody());
        Map<String, Object> data = JSON.parseObject(content, Map.class);
        Long userId = data.get("userId");
        Integer points = data.get("points");
        
        // 幂等性校验:查流水表,判断是否已处理
        if (pointFlowMapper.existsByMsgId(msg.getMsgId())) { 
            return; // 已处理,直接返回
        }
        
        // 执行加积分
        pointService.addPoints(userId, points); 
        // 记录流水(幂等性依据)
        pointFlowMapper.insert(userId, points, msg.getMsgId()); 
    }
}

这一步的关键:幂等性。MQ 可能因网络问题重试投递,消费端需确保重复消费不影响结果。

(四)阶段 4:事务回查(兜底机制)

如果 Producer 执行本地事务时 超时或宕机(比如步骤 2 没返回 commit/rollback),MQ 会触发 事务回查:主动询问 Producer 本地事务的状态,决定消息是提交还是回滚。

Producer 需实现事务回查逻辑

// 实现 TransactionChecker,处理 MQ 的回查请求
public LocalTransactionState check(Message msg) {
    // 查询本地事务状态:查用户是否已新增
    boolean isUserExist = userMapper.exists(userId); 
    if (isUserExist) {
        // 本地事务成功,返回 COMMIT
        return LocalTransactionState.COMMIT_MESSAGE; 
    } else {
        // 本地事务失败,返回 ROLLBACK
        return LocalTransactionState.ROLLBACK_MESSAGE; 
    }
}

这一步的关键:兜底机制。即使 Producer 出问题,MQ 也能通过回查保证消息状态与本地事务一致。

三、方案优势:为何选 RocketMQ 事务消息?

(一)对比本地消息表的优势

  1. 无需维护本地消息表:RocketMQ 把 “消息表” 逻辑封装到 MQ 内部,减少数据库依赖。
  2. 自带事务回查:无需手动写定时任务扫描消息表,MQ 自动处理超时回查。
  3. 高可靠投递:RocketMQ 的消息持久化、重试机制,比手动实现更稳定。

(二)适用场景

  • 对最终一致性要求高,且能容忍短暂不一致(如积分、通知、订单状态同步)。
  • 依赖 MQ 生态:已有 RocketMQ 集群,想简化分布式事务实现。
  • 高并发场景:RocketMQ 的性能优于本地消息表(减少数据库读写压力)。

四、潜在问题与解决

(一)事务回查频率过高

若 Producer 大量超时,MQ 频繁回查会增加压力。
解法

  • 优化本地事务性能,减少执行时间。
  • 调整 RocketMQ 的回查间隔(transactionCheckInterval),避免短时间内大量回查。

(二)消息积压导致延迟

高并发下,MQ 消息可能积压,影响消费端处理时效。
解法

  • 优化消费端逻辑,增加并发消费线程。
  • 对非关键消息(如积分),采用异步 + 批量处理。

五、总结:RocketMQ 事务消息的价值

基于 RocketMQ 的可靠消息方案,本质是 “本地消息表的 MQ 化” —— 把消息持久化、重试、回查逻辑交给 MQ,简化了业务开发。

它的核心优势是 “最终一致性 + 低侵入性”:只需关注本地事务和消费逻辑,MQ 自动处理消息状态、回查、重试。适合依赖 MQ 生态的项目,尤其是阿里系技术栈(如 Dubbo + RocketMQ)。

理解这套流程后,面对跨服务事务需求,就能判断是否用 RocketMQ 事务消息兜底,或结合 TCC、Saga 等方案实现更复杂的一致性要求。

(拓展思考:如何监控 RocketMQ 事务消息的回查次数?生产环境中,如何模拟事务回查场景验证兜底逻辑?可以结合 RocketMQ 的监控工具和压测框架深入实践~)

如果这篇文章对大家有帮助可以点赞关注,你的支持就是我的动力😊!

Logo

更多推荐