一、核心概念理解

事务消息解决什么问题?

java

复制

下载

// 分布式事务典型问题:本地事务与消息发送的一致性
// 传统方式存在的问题:
1. 先发消息,后执行本地事务 → 消息发送成功但本地事务失败
2. 先执行本地事务,后发消息 → 本地事务成功但消息发送失败

RocketMQ事务消息的核心机制

text

复制

下载

Producer发送Half消息 → Broker存储Half消息 → 执行本地事务
       ↓
Broker等待事务状态回查 ← Producer返回本地事务结果
       ↓
根据结果提交或回滚消息

二、两阶段提交详细流程

第一阶段:发送Half消息

java

复制

下载

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
        producer.setNamesrvAddr("127.0.0.1:9876");
        
        // 2. 设置事务监听器(核心)
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 执行本地事务
             * @param msg Half消息
             * @param arg 业务参数
             * @return 本地事务状态
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                try {
                    // 执行本地数据库事务
                    boolean success = doLocalBusinessTransaction(msg, arg);
                    
                    if (success) {
                        System.out.println("本地事务执行成功,提交消息");
                        return LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        System.out.println("本地事务执行失败,回滚消息");
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                } catch (Exception e) {
                    System.out.println("本地事务执行异常,回查");
                    return LocalTransactionState.UNKNOW;
                }
            }
            
            /**
             * 事务回查(Broker主动查询事务状态)
             * @param msg Half消息
             * @return 事务状态
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 根据业务ID查询本地事务状态
                String transactionId = msg.getTransactionId();
                boolean status = queryLocalTransactionStatus(transactionId);
                
                if (status) {
                    System.out.println("事务回查:本地事务已提交");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    System.out.println("事务回查:本地事务已回滚");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
        });
        
        // 3. 启动生产者
        producer.start();
        
        // 4. 发送事务消息
        Message msg = new Message("TransactionTopic", 
                                 "TagA", 
                                 "Order-001".getBytes(StandardCharsets.UTF_8));
        
        // 设置事务ID(关键)
        msg.setKeys("TXN-" + System.currentTimeMillis());
        
        // 发送Half消息(第一阶段)
        SendResult sendResult = producer.sendMessageInTransaction(msg, 
            // 业务参数,会在executeLocalTransaction中传递
            new BusinessParam("orderId", "123456", 100.00)
        );
        
        System.out.println("Half消息发送结果: " + sendResult.getSendStatus());
    }
}

 篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc

需要全套面试笔记及答案
【点击此处即可/免费获取】

三、完整执行时序图

text

复制

下载

┌─────────┐      ┌────────┐      ┌────────┐
│ Producer│      │ Broker │      │ 本地DB │
└────┬────┘      └───┬────┘      └────┬───┘
     │ 1.发送Half消息  │               │
     │───────────────>│               │
     │                │               │
     │                │ 2.存储Half消息 │
     │                │───────────────>│
     │                │               │
     │ 3.返回Half成功 │               │
     │<───────────────│               │
     │                │               │
     │ 4.执行本地事务  │               │
     │────────────────────────────────>│
     │                │               │
     │ 5.返回事务状态  │               │
     │───────────────>│               │
     │                │               │
     │                │6.提交/回滚消息 │
     │                │───────────────>│
     │                │               │
     │(可能)7.事务回查 │               │
     │<───────────────│               │
     │                │               │
     │ 8.返回回查结果  │               │
     │───────────────>│               │
     │                │               │
     │                │9.最终提交/回滚 │
     │                │───────────────>│

四、关键配置参数

yaml

复制

下载

# Broker端配置
broker.conf:
  transactionCheckMax: 15           # 最大回查次数
  transactionCheckInterval: 60000   # 回查间隔(ms)
  transactionTimeOut: 6000          # 超时时间(ms)
  
# Producer端配置
producer:
  checkThreadPoolMinSize: 1         # 回查线程池最小
  checkThreadPoolMaxSize: 1         # 回查线程池最大
  checkRequestHoldMax: 2000         # 回查请求队列大小

五、代码实现最佳实践

1. 完整的订单事务示例

java

复制

下载

@Service
public class OrderTransactionService {
    
    @Resource
    private OrderMapper orderMapper;
    
    @Resource
    private TransactionMQProducer transactionMQProducer;
    
    /**
     * 创建订单事务消息
     */
    public void createOrderWithTransaction(OrderDTO orderDTO) {
        // 构建消息
        Message msg = new Message("ORDER_TOPIC", 
                                 "CREATE",
                                 JSON.toJSONBytes(orderDTO));
        
        // 设置业务标识
        msg.setKeys("ORDER_" + orderDTO.getOrderNo());
        msg.putUserProperty("businessType", "ORDER_CREATE");
        
        // 发送事务消息
        SendResult sendResult = transactionMQProducer.sendMessageInTransaction(
            msg, 
            new OrderTransactionArg(orderDTO)
        );
        
        if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
            throw new RuntimeException("Half消息发送失败");
        }
    }
    
    /**
     * 事务监听器实现
     */
    @Component
    public class OrderTransactionListener implements TransactionListener {
        
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            OrderTransactionArg transactionArg = (OrderTransactionArg) arg;
            OrderDTO orderDTO = transactionArg.getOrderDTO();
            
            try {
                // 1. 保存订单到数据库
                Order order = convertToOrder(orderDTO);
                orderMapper.insert(order);
                
                // 2. 扣减库存(调用库存服务)
                boolean deductResult = inventoryService.deductStock(
                    orderDTO.getProductId(), 
                    orderDTO.getQuantity()
                );
                
                if (!deductResult) {
                    // 库存不足,回滚本地事务
                    orderMapper.deleteById(order.getId());
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                
                // 3. 记录事务日志(用于回查)
                transactionLogService.saveTransactionLog(
                    msg.getTransactionId(),
                    "ORDER_CREATE",
                    order.getId(),
                    LocalTransactionState.COMMIT_MESSAGE.name()
                );
                
                return LocalTransactionState.COMMIT_MESSAGE;
                
            } catch (Exception e) {
                log.error("订单本地事务执行异常", e);
                
                // 记录异常状态
                transactionLogService.saveTransactionLog(
                    msg.getTransactionId(),
                    "ORDER_CREATE",
                    null,
                    LocalTransactionState.UNKNOW.name()
                );
                
                return LocalTransactionState.UNKNOW;
            }
        }
        
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 根据事务ID查询事务日志
            String transactionId = msg.getTransactionId();
            TransactionLog log = transactionLogService.getByTransactionId(transactionId);
            
            if (log == null) {
                // 没有事务记录,需要回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            
            if ("COMMIT_MESSAGE".equals(log.getStatus())) {
                // 事务已提交
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                // 事务需要回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    }
    
    /**
     * 事务参数封装
     */
    @Data
    @AllArgsConstructor
    public static class OrderTransactionArg {
        private OrderDTO orderDTO;
    }
}

2. 消费端幂等处理

java

复制

下载

@Component
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "ORDER_CONSUMER_GROUP"
)
public class OrderConsumer implements RocketMQListener<MessageExt> {
    
    @Override
    public void onMessage(MessageExt message) {
        // 1. 检查消息幂等性
        String messageId = message.getMsgId();
        if (redisTemplate.hasKey("MSG_" + messageId)) {
            log.info("消息已处理,跳过: {}", messageId);
            return;
        }
        
        // 2. 解析消息
        OrderDTO orderDTO = JSON.parseObject(message.getBody(), OrderDTO.class);
        
        // 3. 业务处理
        try {
            // 更新订单状态为已确认
            orderService.confirmOrder(orderDTO.getOrderNo());
            
            // 4. 记录已处理消息
            redisTemplate.opsForValue().set(
                "MSG_" + messageId, 
                "PROCESSED", 
                1, TimeUnit.HOURS
            );
            
        } catch (Exception e) {
            log.error("订单处理失败,将重试", e);
            throw new RuntimeException(e);
        }
    }
}

六、面试问题回答要点

问题:RocketMQ事务消息如何实现二阶段提交?

回答结构:

  1. 概念解释

    • "RocketMQ事务消息通过二阶段提交保证分布式事务的最终一致性"

    • "核心思想:将本地事务和消息发送绑定,通过Half消息和状态回查机制实现"

  2. 第一阶段(Half消息阶段)

    • "Producer发送Half消息到Broker,Broker存储但不对Consumer可见"

    • "Half消息发送成功后,执行本地事务"

    • "本地事务执行结果返回给Broker:COMMIT、ROLLBACK或UNKNOWN"

篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc

需要全套面试笔记及答案
【点击此处即可/免费获取】​​​

  1. 第二阶段(状态确认阶段)

    • "如果本地事务返回COMMIT/ROLLBACK,Broker立即提交/回滚消息"

    • "如果返回UNKNOWN,Broker会发起事务状态回查"

    • "Producer实现TransactionListener.checkLocalTransaction()进行状态查询"

  2. 关键机制

    • "事务状态回查:解决网络超时或生产者宕机问题"

    • "消息幂等性:消费端需要处理重复消息"

    • "超时机制:超过配置时间未确认的消息会自动回滚"

  3. 代码示例

    java

    复制

    下载
    // 简要展示核心代码结构
    producer.setTransactionListener(new TransactionListener() {
        public LocalTransactionState executeLocalTransaction(...) {
            // 执行本地业务
        }
        public LocalTransactionState checkLocalTransaction(...) {
            // 状态回查
        }
    });
  4. 适用场景

    • "订单创建+通知库存"

    • "支付成功+发送通知"

    • "任何需要保证本地事务和消息发送一致性的场景"

  5. 注意事项

    • "事务消息不支持定时和批量消息"

    • "确保checkLocalTransaction方法的幂等性"

    • "合理配置回查次数和间隔"

面试加分项

  • 提到"最大努力通知型事务"

  • 对比TCC、Saga等分布式事务方案

  • 强调消息幂等处理的重要性

  • 提及RocketMQ 4.3+的事务消息优化

这样的回答既展示了理论知识,又体现了实际编码能力,适合中高级Java岗位面试。

Logo

更多推荐