在微服务架构中,分布式事务控制是一个重要的挑战。当不同服务间需要保证数据一致性时,传统的单机事务管理已不再适用,而事务消息成为一种有效的解决方案。本文将详细介绍如何在Spring Cloud项目中使用RocketMQ的事务消息实现分布式事务,结合spring-cloud-starter-stream-rocketmq依赖进行配置和操作。

1. 为什么需要事务消息?

在分布式系统中,不同微服务可能涉及多个数据库或资源的操作。为了保证跨服务的数据一致性,我们可以采用分布式事务。事务消息是一种典型的实现方式,能够确保消息的发送与业务操作的执行具有一致性。

例如,在一个电商系统中,用户下单后,库存服务需要减库存,支付服务需要扣款。通过事务消息可以确保订单创建与库存减少、支付扣款等操作在分布式环境下的一致性。

2. 事务消息的原理

RocketMQ的事务消息原理大致可以分为三个步骤:

  1. 半消息:先发送一个状态为"待确认"的消息,RocketMQ称之为“半消息”,此时消息不会被消费。
  2. 本地事务执行:在发送半消息后执行本地事务,如果本地事务成功,则提交事务消息;如果本地事务失败,则回滚消息。
  3. 消息回查:RocketMQ会定时查询消息的事务状态,如果一直没有收到事务确认,RocketMQ会触发消息回查,调用事务检查接口以确保数据一致性。
  4. 消费者消费:如果提交了事务消息,则消费者进行消费。

下面是官方给出的事务消息的处理流程:

3. 引入依赖

pom.xml中引入Spring Cloud推荐的RocketMQ依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

此依赖提供了Spring Cloud Stream的RocketMQ集成,可以简化RocketMQ的使用和事务管理。

4. 配置RocketMQ事务消息

application.yml中配置RocketMQ的基本信息:

生产者

spring:
  cloud:
    stream:
      function:
        definition: buy; #配置了 Spring Cloud Stream 使用的函数入口名称buy,Spring Cloud Stream 会根据这个配置来调用 buy 函数进行消息处理
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 # NameServer地址
        bindings:
          buy-out-0: #配置了一个名为 buy-out-0 的输出通道
            producer:
              transactionListener: inventoryDeductTransactionListener # 事务监听器
              producerType: Trans # 类型为事务消息
      bindings:
        buy-out-0: #buy-out-0输出通道的配置
          content-type: application/json
          destination: buy-topic #发送到指定的topic
          binder: rocketmq  #MQ的类型

消费者

spring:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876 # NameServer地址
      bindings:
        buy-in-0: #buy-in-0输入通道的配置,对应前面的buy-out-0
          content-type: application/json # 消息内容类型为 JSON
          destination: buy-topic # 消息目标主题(RocketMQ 中的 Topic)
          group: trade-group # 消费者组(消费者组可以订阅同一主题的消息,保证消费的幂等性)
          binder: rocketmq # 指定 RocketMQ 作为消息中间件的绑定器

5. 编写事务消息的生产者

事务消息生产者负责发送半消息、执行本地事务,并在需要时进行事务检查。

5.1 定义业务接口

假设我们有一个订单服务,接口如下:

public interface OrderService {
    void createOrder(String productId, int quantity);
}
5.2 编写事务消息生产者

事务消息生产者发送事务消息。

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private StreamBridge streamBridge;

    @Override
    public void createOrder(String userId, String productId, int quantity) {
        // 发送事务消息
        Message<String> message = MessageBuilder.withPayload("订单创建请求")
                .setHeader("identifier", UUID.randomUUID().toString()) // 设置消息唯一标识
                .build();

        // 使用StreamBridge发送消息到RocketMQ,buy-out-0对应配置文件中的配置
        boolean sendResult = streamBridge.send("buy-out-0", message);

        if (!sendResult) {
            throw new RuntimeException("消息发送失败");
        }
    }
}

在上面的代码中,StreamBridge用于发送事务消息。buy-out-0绑定了buy-topic,我们可以将事务消息发送到指定的主题。

6. 事务监听器配置

事务监听器用于处理RocketMQ的事务消息回查逻辑。在Spring Cloud Stream中,可以通过实现TransactionListener接口来管理事务的回查逻辑。

@Component
public class InventoryDeductTransactionListener implements TransactionListener {

    private static final Logger LOG = LoggerFactory.getLogger(InventoryDeductTransactionListener.class);
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
       try {
            // 执行本地事务,如订单创建
            LOG.info("Execute local Transaction");
            LOG.info(message.toString());
            // 本地事务成功,返回COMMIT_MESSAGE
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事务失败,返回ROLLBACK_MESSAGE
            LOG.error(e.getMessage(), e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        LOG.info("Check local Transaction");
        LOG.info(messageExt.toString());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

InventoryDeductTransactionListener中的executeLocalTransaction方法执行本地事务,并根据结果返回事务状态。而checkLocalTransaction方法则用于RocketMQ消息回查,以确定事务消息的最终状态。

7. 编写事务消息的消费者

事务消息消费者处理生产者本地事务提交后的逻辑,如扣减库存等。

@Component
public class BuyMessageListener {

    private static final Logger LOG = LoggerFactory.getLogger(BuyMessageListener.class);

    @Bean
    public Consumer<Message<String>> buy() { //方法名和前面生产者配置文件相对应
        return msg -> {
            Object arg = msg.getHeaders();
            LOG.info("{} Receive New Messages: {} ARG:{}", Thread.currentThread().getName(), msg.getPayload(), arg);
            // 处理后业务,如库存扣减等
        };
    }
}

8. 测试和验证

通过OrderService创建订单服务,测试事务消息的发送与回查机制是否正常工作。可以在本地事务执行代码中引入异常,测试在不同事务状态下,RocketMQ如何进行消息的提交与回查。

8.1 发送事务消息且本地事务成功提交

通过调用 OrderServicecreateOrder 方法发送事务消息并执行本地事务。

从上面的日志可以看出,先发送一条半消息,然后成功执行了本地事务, 最后提交这个事务消息。消费者收到消息进行业务处理。

8.2 发送事务消息且本地事务处理失败

在事务监听器中模拟一个异常,代码改造如下。

@Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        String str = null;
        try {
            // 执行本地事务,如扣减库存、记录订单等
            LOG.info("Execute local Transaction");
            LOG.info(message.toString());
            // 访问空对象的属性,触发 NullPointerException
            LOG.info(String.valueOf(str.length()));
            // 本地事务成功,返回COMMIT_MESSAGE
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事务失败,返回ROLLBACK_MESSAGE
            LOG.error(e.getMessage());
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

日志如下:

从日志中可以看到,当本地事务执行失败,返回ROLLBACK_MESSAGE的时候,消费者不会收到这条消息,从而保证了一致性。

 8.3 发送事务消息且本地事务执行超时触发消息回查

在事务监听器中阻塞30s,代码改造如下。

@Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            // 执行本地事务,如扣减库存、记录订单等
            LOG.info("Execute local Transaction");
            LOG.info(message.toString());
            // 阻塞 30 秒
            TimeUnit.SECONDS.sleep(30);
            // 本地事务成功,返回COMMIT_MESSAGE
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事务失败,返回ROLLBACK_MESSAGE
            LOG.error(e.getMessage());
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

日志如下:

 从日志中可以看出,当在执行本地事务中超时的时候(我们手动阻塞了30s),会触发检查本地事务的方法提交或回滚事务消息,我们模拟提交事务消息,然后消费者会收到消息进行消费。

9. RocketMQ 事务消息的优势和适用场景

使用 RocketMQ 事务消息在分布式环境下确保数据一致性,适用于以下场景:

  • 金融交易:确保用户扣款成功时更新余额。
  • 库存管理:在订单创建和库存扣减过程中保障一致性。
  • 电商系统:下单成功后确保库存和支付服务成功执行。

事务消息可以提供最终一致性保障,但并不适合对实时性要求特别高的场景。

10.总结

本文介绍了在Spring Cloud项目中如何使用RocketMQ的事务消息实现分布式事务管理,重点解析了事务消息的原理及配置方式。通过RocketMQ的事务消息,我们可以实现跨服务的数据一致性,增强分布式系统的可靠性。使用spring-cloud-starter-stream-rocketmq依赖简化了开发,使事务消息的配置和实现更加高效。

 

Logo

更多推荐