宇树Java面试被问:RocketMQ事务消息的二阶段提交实现
RocketMQ事务消息通过二阶段提交机制解决分布式系统中本地事务与消息发送的一致性问题。第一阶段发送Half消息到Broker并执行本地事务,第二阶段根据事务结果提交或回滚消息。核心机制包括:事务监听器实现本地业务逻辑,Broker定期回查未知状态的事务,以及消费端的幂等处理。典型应用场景包括订单创建与库存扣减等需要保证最终一致性的业务。该方案相比传统方式能有效避免消息丢失或重复问题,需注意合理
一、核心概念理解
事务消息解决什么问题?
java
复制
下载
// 分布式事务典型问题:本地事务与消息发送的一致性 // 传统方式存在的问题: 1. 先发消息,后执行本地事务 → 消息发送成功但本地事务失败 2. 先执行本地事务,后发消息 → 本地事务成功但消息发送失败
RocketMQ事务消息的核心机制
text
复制
下载
Producer发送Half消息 → Broker存储Half消息 → 执行本地事务
↓
Broker等待事务状态回查 ← Producer返回本地事务结果
↓
根据结果提交或回滚消息
二、两阶段提交详细流程
第一阶段:发送Half消息
java
复制
下载
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1. 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
// 2. 设置事务监听器(核心)
producer.setTransactionListener(new TransactionListener() {
/**
* 执行本地事务
* @param msg Half消息
* @param arg 业务参数
* @return 本地事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地数据库事务
boolean success = doLocalBusinessTransaction(msg, arg);
if (success) {
System.out.println("本地事务执行成功,提交消息");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
System.out.println("本地事务执行失败,回滚消息");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
System.out.println("本地事务执行异常,回查");
return LocalTransactionState.UNKNOW;
}
}
/**
* 事务回查(Broker主动查询事务状态)
* @param msg Half消息
* @return 事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据业务ID查询本地事务状态
String transactionId = msg.getTransactionId();
boolean status = queryLocalTransactionStatus(transactionId);
if (status) {
System.out.println("事务回查:本地事务已提交");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
System.out.println("事务回查:本地事务已回滚");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
// 3. 启动生产者
producer.start();
// 4. 发送事务消息
Message msg = new Message("TransactionTopic",
"TagA",
"Order-001".getBytes(StandardCharsets.UTF_8));
// 设置事务ID(关键)
msg.setKeys("TXN-" + System.currentTimeMillis());
// 发送Half消息(第一阶段)
SendResult sendResult = producer.sendMessageInTransaction(msg,
// 业务参数,会在executeLocalTransaction中传递
new BusinessParam("orderId", "123456", 100.00)
);
System.out.println("Half消息发送结果: " + sendResult.getSendStatus());
}
}
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
三、完整执行时序图
text
复制
下载
┌─────────┐ ┌────────┐ ┌────────┐
│ Producer│ │ Broker │ │ 本地DB │
└────┬────┘ └───┬────┘ └────┬───┘
│ 1.发送Half消息 │ │
│───────────────>│ │
│ │ │
│ │ 2.存储Half消息 │
│ │───────────────>│
│ │ │
│ 3.返回Half成功 │ │
│<───────────────│ │
│ │ │
│ 4.执行本地事务 │ │
│────────────────────────────────>│
│ │ │
│ 5.返回事务状态 │ │
│───────────────>│ │
│ │ │
│ │6.提交/回滚消息 │
│ │───────────────>│
│ │ │
│(可能)7.事务回查 │ │
│<───────────────│ │
│ │ │
│ 8.返回回查结果 │ │
│───────────────>│ │
│ │ │
│ │9.最终提交/回滚 │
│ │───────────────>│
四、关键配置参数
yaml
复制
下载
# Broker端配置 broker.conf: transactionCheckMax: 15 # 最大回查次数 transactionCheckInterval: 60000 # 回查间隔(ms) transactionTimeOut: 6000 # 超时时间(ms) # Producer端配置 producer: checkThreadPoolMinSize: 1 # 回查线程池最小 checkThreadPoolMaxSize: 1 # 回查线程池最大 checkRequestHoldMax: 2000 # 回查请求队列大小
五、代码实现最佳实践
1. 完整的订单事务示例
java
复制
下载
@Service
public class OrderTransactionService {
@Resource
private OrderMapper orderMapper;
@Resource
private TransactionMQProducer transactionMQProducer;
/**
* 创建订单事务消息
*/
public void createOrderWithTransaction(OrderDTO orderDTO) {
// 构建消息
Message msg = new Message("ORDER_TOPIC",
"CREATE",
JSON.toJSONBytes(orderDTO));
// 设置业务标识
msg.setKeys("ORDER_" + orderDTO.getOrderNo());
msg.putUserProperty("businessType", "ORDER_CREATE");
// 发送事务消息
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(
msg,
new OrderTransactionArg(orderDTO)
);
if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new RuntimeException("Half消息发送失败");
}
}
/**
* 事务监听器实现
*/
@Component
public class OrderTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
OrderTransactionArg transactionArg = (OrderTransactionArg) arg;
OrderDTO orderDTO = transactionArg.getOrderDTO();
try {
// 1. 保存订单到数据库
Order order = convertToOrder(orderDTO);
orderMapper.insert(order);
// 2. 扣减库存(调用库存服务)
boolean deductResult = inventoryService.deductStock(
orderDTO.getProductId(),
orderDTO.getQuantity()
);
if (!deductResult) {
// 库存不足,回滚本地事务
orderMapper.deleteById(order.getId());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 3. 记录事务日志(用于回查)
transactionLogService.saveTransactionLog(
msg.getTransactionId(),
"ORDER_CREATE",
order.getId(),
LocalTransactionState.COMMIT_MESSAGE.name()
);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("订单本地事务执行异常", e);
// 记录异常状态
transactionLogService.saveTransactionLog(
msg.getTransactionId(),
"ORDER_CREATE",
null,
LocalTransactionState.UNKNOW.name()
);
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据事务ID查询事务日志
String transactionId = msg.getTransactionId();
TransactionLog log = transactionLogService.getByTransactionId(transactionId);
if (log == null) {
// 没有事务记录,需要回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
if ("COMMIT_MESSAGE".equals(log.getStatus())) {
// 事务已提交
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 事务需要回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
/**
* 事务参数封装
*/
@Data
@AllArgsConstructor
public static class OrderTransactionArg {
private OrderDTO orderDTO;
}
}
2. 消费端幂等处理
java
复制
下载
@Component
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "ORDER_CONSUMER_GROUP"
)
public class OrderConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
// 1. 检查消息幂等性
String messageId = message.getMsgId();
if (redisTemplate.hasKey("MSG_" + messageId)) {
log.info("消息已处理,跳过: {}", messageId);
return;
}
// 2. 解析消息
OrderDTO orderDTO = JSON.parseObject(message.getBody(), OrderDTO.class);
// 3. 业务处理
try {
// 更新订单状态为已确认
orderService.confirmOrder(orderDTO.getOrderNo());
// 4. 记录已处理消息
redisTemplate.opsForValue().set(
"MSG_" + messageId,
"PROCESSED",
1, TimeUnit.HOURS
);
} catch (Exception e) {
log.error("订单处理失败,将重试", e);
throw new RuntimeException(e);
}
}
}
六、面试问题回答要点
问题:RocketMQ事务消息如何实现二阶段提交?
回答结构:
-
概念解释
-
"RocketMQ事务消息通过二阶段提交保证分布式事务的最终一致性"
-
"核心思想:将本地事务和消息发送绑定,通过Half消息和状态回查机制实现"
-
-
第一阶段(Half消息阶段)
-
"Producer发送Half消息到Broker,Broker存储但不对Consumer可见"
-
"Half消息发送成功后,执行本地事务"
-
"本地事务执行结果返回给Broker:COMMIT、ROLLBACK或UNKNOWN"
-
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
-
第二阶段(状态确认阶段)
-
"如果本地事务返回COMMIT/ROLLBACK,Broker立即提交/回滚消息"
-
"如果返回UNKNOWN,Broker会发起事务状态回查"
-
"Producer实现TransactionListener.checkLocalTransaction()进行状态查询"
-
-
关键机制
-
"事务状态回查:解决网络超时或生产者宕机问题"
-
"消息幂等性:消费端需要处理重复消息"
-
"超时机制:超过配置时间未确认的消息会自动回滚"
-
-
代码示例
java
复制 下载// 简要展示核心代码结构 producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(...) { // 执行本地业务 } public LocalTransactionState checkLocalTransaction(...) { // 状态回查 } }); -
适用场景
-
"订单创建+通知库存"
-
"支付成功+发送通知"
-
"任何需要保证本地事务和消息发送一致性的场景"
-
-
注意事项
-
"事务消息不支持定时和批量消息"
-
"确保checkLocalTransaction方法的幂等性"
-
"合理配置回查次数和间隔"
-
面试加分项
-
提到"最大努力通知型事务"
-
对比TCC、Saga等分布式事务方案
-
强调消息幂等处理的重要性
-
提及RocketMQ 4.3+的事务消息优化
这样的回答既展示了理论知识,又体现了实际编码能力,适合中高级Java岗位面试。
更多推荐

所有评论(0)