分布式事务:基于 RocketMQ 可靠消息,实现最终一致性
在分布式系统的事务难题中,“最终一致性” 是很多场景的务实选择。今天聊聊 基于 RocketMQ 可靠消息的最终一致性方案 —— 它把本地消息表的逻辑封装到 MQ 内部,用 RocketMQ 的事务消息能力,优雅解决跨服务数据同步问题。
一、核心逻辑:MQ 化身事务协调者
RocketMQ 4.3+ 版本支持 事务消息,核心思路是:
让 MQ 作为 “事务协调者”,保证 “Producer 本地事务” 和 “MQ 消息投递” 的原子性。即便 Producer 或网络出问题,也能通过 事务回查 机制,保证最终一致。
以 “注册送积分” 为例(用户服务新增用户,积分服务加积分),流程分 5 步:
- 发半消息(Prepared 状态):Producer(用户服务)发一条 “半消息” 到 MQ,此时消费者(积分服务)无法消费。
- 执行本地事务:Producer 执行本地业务(新增用户),用数据库事务保证操作原子性。
- 提交 / 回滚消息:本地事务成功 → 发 commit 指令,MQ 标记消息为 “可消费”;失败 → 发 rollback 指令,MQ 删除消息。
- 消费者消费消息:MQ 把可消费的消息投递给消费者(积分服务),消费者执行 “加积分”,并通过 ACK 确认。
- 事务回查(兜底):若 Producer 超时未发 commit/rollback,MQ 会主动回查 Producer 的事务状态,决定消息 fate。

二、流程拆解:从注册到积分到账
(一)阶段 1:发送半消息(Prepared 状态)
Producer(用户服务)先给 MQ 发一条 “半消息”,内容是 “用户注册成功,需加积分”。此时 MQ 会把消息标记为 Prepared(预提交),消费者无法订阅到这条消息。
代码示例(Java + RocketMQ):
// 1. 创建事务消息
Message message = new Message(
"积分主题",
"tag_register",
"userId:123, points:10".getBytes()
);
// 2. 发送半消息,返回事务 ID
SendResult sendResult = producer.sendMessageInTransaction(
message,
new LocalTransactionExecuter() {
// 本地事务执行逻辑(后续阶段 2 调用)
@Override
public LocalTransactionState execute(Message msg, Object arg) {
// 这里先返回 UNKNOW,等阶段 2 再处理
return LocalTransactionState.UNKNOW;
}
},
null // 额外参数
);
这一步的关键:半消息是 “占位符”,确保 MQ 先确认消息可达,再执行本地事务。
(二)阶段 2:执行本地事务 + 提交 / 回滚
Producer 收到 MQ 的 “半消息发送成功” 响应后,执行本地事务(新增用户),并根据结果通知 MQ 是 commit 还是 rollback。
本地事务执行逻辑(补充阶段 1 的代码):
// 实现 LocalTransactionExecuter,处理本地事务
public LocalTransactionState execute(Message msg, Object arg) {
try {
// 执行本地事务:新增用户
userMapper.insertUser(userId);
// 本地事务成功,返回 COMMIT:通知 MQ 提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,返回 ROLLBACK:通知 MQ 回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
这一步的关键:本地事务与 MQ 消息状态强关联。本地事务成功,MQ 才会把消息标记为 “可消费”;失败则 MQ 丢弃消息。
(三)阶段 3:消费者消费消息(积分服务)
MQ 收到 commit 指令后,把消息标记为 “可消费”,投递给积分服务。积分服务消费消息时,需保证 幂等性(防止 MQ 重试导致重复加积分)。
积分服务消费逻辑:
// 监听 MQ 消息
@RocketMQMessageListener(
topic = "积分主题",
consumerGroup = "积分消费组"
)
public class PointConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msg) {
// 解析消息:userId、points
String content = new String(msg.getBody());
Map<String, Object> data = JSON.parseObject(content, Map.class);
Long userId = data.get("userId");
Integer points = data.get("points");
// 幂等性校验:查流水表,判断是否已处理
if (pointFlowMapper.existsByMsgId(msg.getMsgId())) {
return; // 已处理,直接返回
}
// 执行加积分
pointService.addPoints(userId, points);
// 记录流水(幂等性依据)
pointFlowMapper.insert(userId, points, msg.getMsgId());
}
}
这一步的关键:幂等性。MQ 可能因网络问题重试投递,消费端需确保重复消费不影响结果。
(四)阶段 4:事务回查(兜底机制)
如果 Producer 执行本地事务时 超时或宕机(比如步骤 2 没返回 commit/rollback),MQ 会触发 事务回查:主动询问 Producer 本地事务的状态,决定消息是提交还是回滚。
Producer 需实现事务回查逻辑:
// 实现 TransactionChecker,处理 MQ 的回查请求
public LocalTransactionState check(Message msg) {
// 查询本地事务状态:查用户是否已新增
boolean isUserExist = userMapper.exists(userId);
if (isUserExist) {
// 本地事务成功,返回 COMMIT
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务失败,返回 ROLLBACK
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
这一步的关键:兜底机制。即使 Producer 出问题,MQ 也能通过回查保证消息状态与本地事务一致。
三、方案优势:为何选 RocketMQ 事务消息?
(一)对比本地消息表的优势
- 无需维护本地消息表:RocketMQ 把 “消息表” 逻辑封装到 MQ 内部,减少数据库依赖。
- 自带事务回查:无需手动写定时任务扫描消息表,MQ 自动处理超时回查。
- 高可靠投递:RocketMQ 的消息持久化、重试机制,比手动实现更稳定。
(二)适用场景
- 对最终一致性要求高,且能容忍短暂不一致(如积分、通知、订单状态同步)。
- 依赖 MQ 生态:已有 RocketMQ 集群,想简化分布式事务实现。
- 高并发场景:RocketMQ 的性能优于本地消息表(减少数据库读写压力)。
四、潜在问题与解决
(一)事务回查频率过高
若 Producer 大量超时,MQ 频繁回查会增加压力。
解法:
- 优化本地事务性能,减少执行时间。
- 调整 RocketMQ 的回查间隔(
transactionCheckInterval),避免短时间内大量回查。
(二)消息积压导致延迟
高并发下,MQ 消息可能积压,影响消费端处理时效。
解法:
- 优化消费端逻辑,增加并发消费线程。
- 对非关键消息(如积分),采用异步 + 批量处理。
五、总结:RocketMQ 事务消息的价值
基于 RocketMQ 的可靠消息方案,本质是 “本地消息表的 MQ 化” —— 把消息持久化、重试、回查逻辑交给 MQ,简化了业务开发。
它的核心优势是 “最终一致性 + 低侵入性”:只需关注本地事务和消费逻辑,MQ 自动处理消息状态、回查、重试。适合依赖 MQ 生态的项目,尤其是阿里系技术栈(如 Dubbo + RocketMQ)。
理解这套流程后,面对跨服务事务需求,就能判断是否用 RocketMQ 事务消息兜底,或结合 TCC、Saga 等方案实现更复杂的一致性要求。
(拓展思考:如何监控 RocketMQ 事务消息的回查次数?生产环境中,如何模拟事务回查场景验证兜底逻辑?可以结合 RocketMQ 的监控工具和压测框架深入实践~)
如果这篇文章对大家有帮助可以点赞关注,你的支持就是我的动力😊!
更多推荐

所有评论(0)