Spring Cloud中怎么使用RocketMQ的事务消息实现分布式事务?
在微服务架构中,分布式事务控制是一个重要的挑战。当不同服务间需要保证数据一致性时,传统的单机事务管理已不再适用,而事务消息成为一种有效的解决方案。本文将详细介绍如何在Spring Cloud项目中使用RocketMQ的事务消息实现分布式事务,结合
spring-cloud-starter-stream-rocketmq依赖进行配置和操作。
1. 为什么需要事务消息?
在分布式系统中,不同微服务可能涉及多个数据库或资源的操作。为了保证跨服务的数据一致性,我们可以采用分布式事务。事务消息是一种典型的实现方式,能够确保消息的发送与业务操作的执行具有一致性。
例如,在一个电商系统中,用户下单后,库存服务需要减库存,支付服务需要扣款。通过事务消息可以确保订单创建与库存减少、支付扣款等操作在分布式环境下的一致性。
2. 事务消息的原理
RocketMQ的事务消息原理大致可以分为三个步骤:
- 半消息:先发送一个状态为"待确认"的消息,RocketMQ称之为“半消息”,此时消息不会被消费。
- 本地事务执行:在发送半消息后执行本地事务,如果本地事务成功,则提交事务消息;如果本地事务失败,则回滚消息。
- 消息回查:RocketMQ会定时查询消息的事务状态,如果一直没有收到事务确认,RocketMQ会触发消息回查,调用事务检查接口以确保数据一致性。
- 消费者消费:如果提交了事务消息,则消费者进行消费。
下面是官方给出的事务消息的处理流程:

3. 引入依赖
在pom.xml中引入Spring Cloud推荐的RocketMQ依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
此依赖提供了Spring Cloud Stream的RocketMQ集成,可以简化RocketMQ的使用和事务管理。
4. 配置RocketMQ事务消息
在application.yml中配置RocketMQ的基本信息:
生产者
spring:
cloud:
stream:
function:
definition: buy; #配置了 Spring Cloud Stream 使用的函数入口名称buy,Spring Cloud Stream 会根据这个配置来调用 buy 函数进行消息处理
rocketmq:
binder:
name-server: 127.0.0.1:9876 # NameServer地址
bindings:
buy-out-0: #配置了一个名为 buy-out-0 的输出通道
producer:
transactionListener: inventoryDeductTransactionListener # 事务监听器
producerType: Trans # 类型为事务消息
bindings:
buy-out-0: #buy-out-0输出通道的配置
content-type: application/json
destination: buy-topic #发送到指定的topic
binder: rocketmq #MQ的类型
消费者
spring:
stream:
rocketmq:
binder:
name-server: localhost:9876 # NameServer地址
bindings:
buy-in-0: #buy-in-0输入通道的配置,对应前面的buy-out-0
content-type: application/json # 消息内容类型为 JSON
destination: buy-topic # 消息目标主题(RocketMQ 中的 Topic)
group: trade-group # 消费者组(消费者组可以订阅同一主题的消息,保证消费的幂等性)
binder: rocketmq # 指定 RocketMQ 作为消息中间件的绑定器
5. 编写事务消息的生产者
事务消息生产者负责发送半消息、执行本地事务,并在需要时进行事务检查。
5.1 定义业务接口
假设我们有一个订单服务,接口如下:
public interface OrderService {
void createOrder(String productId, int quantity);
}
5.2 编写事务消息生产者
事务消息生产者发送事务消息。
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private StreamBridge streamBridge;
@Override
public void createOrder(String userId, String productId, int quantity) {
// 发送事务消息
Message<String> message = MessageBuilder.withPayload("订单创建请求")
.setHeader("identifier", UUID.randomUUID().toString()) // 设置消息唯一标识
.build();
// 使用StreamBridge发送消息到RocketMQ,buy-out-0对应配置文件中的配置
boolean sendResult = streamBridge.send("buy-out-0", message);
if (!sendResult) {
throw new RuntimeException("消息发送失败");
}
}
}
在上面的代码中,StreamBridge用于发送事务消息。buy-out-0绑定了buy-topic,我们可以将事务消息发送到指定的主题。
6. 事务监听器配置
事务监听器用于处理RocketMQ的事务消息回查逻辑。在Spring Cloud Stream中,可以通过实现TransactionListener接口来管理事务的回查逻辑。
@Component
public class InventoryDeductTransactionListener implements TransactionListener {
private static final Logger LOG = LoggerFactory.getLogger(InventoryDeductTransactionListener.class);
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
// 执行本地事务,如订单创建
LOG.info("Execute local Transaction");
LOG.info(message.toString());
// 本地事务成功,返回COMMIT_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,返回ROLLBACK_MESSAGE
LOG.error(e.getMessage(), e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
LOG.info("Check local Transaction");
LOG.info(messageExt.toString());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
InventoryDeductTransactionListener中的executeLocalTransaction方法执行本地事务,并根据结果返回事务状态。而checkLocalTransaction方法则用于RocketMQ消息回查,以确定事务消息的最终状态。
7. 编写事务消息的消费者
事务消息消费者处理生产者本地事务提交后的逻辑,如扣减库存等。
@Component
public class BuyMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(BuyMessageListener.class);
@Bean
public Consumer<Message<String>> buy() { //方法名和前面生产者配置文件相对应
return msg -> {
Object arg = msg.getHeaders();
LOG.info("{} Receive New Messages: {} ARG:{}", Thread.currentThread().getName(), msg.getPayload(), arg);
// 处理后业务,如库存扣减等
};
}
}
8. 测试和验证
通过OrderService创建订单服务,测试事务消息的发送与回查机制是否正常工作。可以在本地事务执行代码中引入异常,测试在不同事务状态下,RocketMQ如何进行消息的提交与回查。
8.1 发送事务消息且本地事务成功提交
通过调用 OrderService 的 createOrder 方法发送事务消息并执行本地事务。


从上面的日志可以看出,先发送一条半消息,然后成功执行了本地事务, 最后提交这个事务消息。消费者收到消息进行业务处理。
8.2 发送事务消息且本地事务处理失败
在事务监听器中模拟一个异常,代码改造如下。
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String str = null;
try {
// 执行本地事务,如扣减库存、记录订单等
LOG.info("Execute local Transaction");
LOG.info(message.toString());
// 访问空对象的属性,触发 NullPointerException
LOG.info(String.valueOf(str.length()));
// 本地事务成功,返回COMMIT_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,返回ROLLBACK_MESSAGE
LOG.error(e.getMessage());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
日志如下:


从日志中可以看到,当本地事务执行失败,返回ROLLBACK_MESSAGE的时候,消费者不会收到这条消息,从而保证了一致性。
8.3 发送事务消息且本地事务执行超时触发消息回查
在事务监听器中阻塞30s,代码改造如下。
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
// 执行本地事务,如扣减库存、记录订单等
LOG.info("Execute local Transaction");
LOG.info(message.toString());
// 阻塞 30 秒
TimeUnit.SECONDS.sleep(30);
// 本地事务成功,返回COMMIT_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,返回ROLLBACK_MESSAGE
LOG.error(e.getMessage());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
日志如下:

从日志中可以看出,当在执行本地事务中超时的时候(我们手动阻塞了30s),会触发检查本地事务的方法提交或回滚事务消息,我们模拟提交事务消息,然后消费者会收到消息进行消费。
9. RocketMQ 事务消息的优势和适用场景
使用 RocketMQ 事务消息在分布式环境下确保数据一致性,适用于以下场景:
- 金融交易:确保用户扣款成功时更新余额。
- 库存管理:在订单创建和库存扣减过程中保障一致性。
- 电商系统:下单成功后确保库存和支付服务成功执行。
事务消息可以提供最终一致性保障,但并不适合对实时性要求特别高的场景。
10.总结
本文介绍了在Spring Cloud项目中如何使用RocketMQ的事务消息实现分布式事务管理,重点解析了事务消息的原理及配置方式。通过RocketMQ的事务消息,我们可以实现跨服务的数据一致性,增强分布式系统的可靠性。使用spring-cloud-starter-stream-rocketmq依赖简化了开发,使事务消息的配置和实现更加高效。
更多推荐

所有评论(0)