🎓博主介绍:Java、Python、js全栈开发 “多面手”,精通多种编程语言和技术,痴迷于人工智能领域。秉持着对技术的热爱与执着,持续探索创新,愿在此分享交流和学习,与大家共进步。
📖DeepSeek-行业融合之万象视界(附实战案例详解100+)
📖全栈开发环境搭建运行攻略:多语言一站式指南(环境搭建+运行+调试+发布+保姆级详解)
👉感兴趣的可以先收藏起来,希望帮助更多的人
在这里插入图片描述

SpringBoot分布式事务终极方案:Seata+RabbitMQ可靠消息

一、背景引入

1.1 分布式系统与事务挑战

在当今的互联网应用中,分布式系统已经成为主流架构。随着业务的不断发展和复杂化,一个业务操作往往需要跨多个服务、数据库进行处理。例如,电商系统中的下单流程,可能涉及到订单服务、库存服务、支付服务等多个微服务的协同工作。在这种分布式环境下,如何保证数据的一致性和事务的完整性成为了一个极具挑战性的问题。

传统的单机事务(如数据库的ACID特性)无法直接应用于分布式系统,因为不同的服务可能使用不同的数据库,甚至运行在不同的物理节点上。因此,我们需要引入分布式事务解决方案来解决这些问题。

1.2 常见分布式事务解决方案概述

常见的分布式事务解决方案有两阶段提交(2PC)、三阶段提交(3PC)、TCC(Try-Confirm-Cancel)、SAGA模式以及可靠消息最终一致性等。每种方案都有其优缺点和适用场景。

两阶段提交和三阶段提交虽然能保证强一致性,但存在性能开销大、单点故障等问题;TCC模式需要业务代码实现Try、Confirm、Cancel三个方法,对业务侵入性较大;SAGA模式通过一系列的本地事务来实现最终一致性,但需要处理复杂的补偿逻辑。而可靠消息最终一致性方案通过消息队列来保证数据的最终一致性,具有较好的性能和可扩展性。

二、Seata与RabbitMQ简介

2.1 Seata简介

Seata(Simple Extensible Autonomous Transaction Architecture)是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata为用户提供了AT、TCC、SAGA和XA四种事务模式,其中AT模式是无侵入的分布式事务解决方案,它可以在不修改业务代码的情况下实现分布式事务。

Seata的核心组件包括TC(Transaction Coordinator,事务协调器)、TM(Transaction Manager,事务管理器)和RM(Resource Manager,资源管理器)。TC是全局事务的协调者,负责全局事务的注册、提交和回滚;TM是全局事务的发起者,负责开启、提交或回滚全局事务;RM是资源管理器,负责管理本地事务资源,并向TC汇报事务状态。

2.2 RabbitMQ简介

RabbitMQ是一个开源的消息队列中间件,基于AMQP(Advanced Message Queuing Protocol)协议实现。它具有高可靠性、高可用性、灵活的路由机制等特点,广泛应用于各种分布式系统中。

在分布式事务中,RabbitMQ可以作为消息的载体,用于在不同的服务之间传递事务消息。通过RabbitMQ的消息确认机制和持久化特性,可以保证消息的可靠传输,从而实现可靠消息最终一致性。

三、Seata+RabbitMQ可靠消息方案原理

3.1 整体架构

Seata+RabbitMQ可靠消息方案的整体架构主要包括以下几个部分:

  • 业务服务:包含多个微服务,如订单服务、库存服务等,这些服务通过Seata进行分布式事务管理,并通过RabbitMQ进行消息通信。
  • Seata组件:包括TC、TM和RM,负责协调和管理全局事务。
  • RabbitMQ:作为消息队列,用于在不同的服务之间传递事务消息。

3.2 工作流程

  1. 事务开启:TM发起全局事务,向TC注册全局事务。
  2. 业务处理:业务服务在本地执行事务操作,同时将需要发送的消息发送到RabbitMQ的待确认队列。
  3. 本地事务提交:业务服务在本地事务提交后,向TC汇报本地事务状态。
  4. 消息确认:TC根据全局事务的状态,向业务服务发送消息确认指令。如果全局事务提交成功,则业务服务将待确认队列中的消息转移到实际的业务队列;如果全局事务回滚,则删除待确认队列中的消息。
  5. 消息消费:下游服务从业务队列中消费消息,并执行相应的业务操作。

3.3 可靠性保证

  • 消息持久化:RabbitMQ支持消息的持久化,将消息存储在磁盘上,确保在消息队列重启后消息不会丢失。
  • 消息确认机制:RabbitMQ提供了生产者确认和消费者确认机制,确保消息的可靠发送和消费。
  • Seata事务协调:Seata通过TC协调全局事务的提交和回滚,保证消息的处理与全局事务的状态一致。

四、Seata+RabbitMQ可靠消息方案实践

4.1 环境搭建

4.1.1 安装Seata
  1. 下载Seata Server:从Seata的GitHub仓库(https://github.com/seata/seata/releases)下载最新版本的Seata Server。
  2. 配置Seata Server:修改conf/application.yml文件,配置数据库连接信息、日志存储方式等。
  3. 启动Seata Server:运行bin/seata-server.sh(Linux/Mac)或bin/seata-server.bat(Windows)启动Seata Server。
4.1.2 安装RabbitMQ
  1. 下载并安装RabbitMQ:根据不同的操作系统,从RabbitMQ官方网站(https://www.rabbitmq.com/download.html)下载并安装RabbitMQ。
  2. 启动RabbitMQ:运行rabbitmq-server启动RabbitMQ服务。
4.1.3 创建Spring Boot项目

使用Spring Initializr(https://start.spring.io/)创建一个Spring Boot项目,添加以下依赖:

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Seata Starter -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    </dependency>
    <!-- RabbitMQ Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- MySQL Connector -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <!-- MyBatis Plus Starter -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.4.3.4</version>
    </dependency>
</dependencies>

4.2 配置Seata和RabbitMQ

4.2.1 配置Seata

application.yml中添加Seata配置:

seata:
  enabled: true
  application-id:${spring.application.name}
  tx-service-group: my_test_tx_group
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: ""
      group: SEATA_GROUP
      username: "nacos"
      password: "nacos"
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace: ""
      group: DEFAULT_GROUP
      username: "nacos"
      password: "nacos"
4.2.2 配置RabbitMQ

application.yml中添加RabbitMQ配置:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4.3 代码实现

4.3.1 订单服务
  1. 创建订单实体类:
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_order")
public class Order {
    private Long id;
    private String orderNo;
    private Integer userId;
    private Integer productId;
    private Integer count;
    private Double amount;
}
  1. 创建订单Mapper接口:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
  1. 创建订单服务类:
import com.alibaba.nacos.common.utils.UuidUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
public class OrderService extends ServiceImpl<OrderMapper, Order> {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GlobalTransactional
    public void createOrder(Order order) {
        System.out.println("当前全局事务ID: " + RootContext.getXID());
        // 生成订单号
        order.setOrderNo(UuidUtils.generateUuid());
        order.setCreateTime(new Date());
        // 保存订单
        this.save(order);

        // 发送消息到RabbitMQ
        rabbitTemplate.convertAndSend("order_exchange", "order_routing_key", order);
    }
}
4.3.2 库存服务
  1. 创建库存实体类:
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_stock")
public class Stock {
    private Long id;
    private Integer productId;
    private Integer stock;
}
  1. 创建库存Mapper接口:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface StockMapper extends BaseMapper<Stock> {
}
  1. 创建库存服务类:
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class StockService extends ServiceImpl<StockMapper, Stock> {

    @RabbitListener(queues = "order_queue")
    public void handleOrderMessage(Order order) {
        // 扣减库存
        LambdaUpdateWrapper<Stock> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper.eq(Stock::getProductId, order.getProductId())
                .ge(Stock::getStock, order.getCount())
                .setSql("stock = stock - " + order.getCount());
        this.update(updateWrapper);
    }
}

4.4 测试与验证

创建一个Controller类来测试订单创建功能:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("/createOrder")
    public String createOrder(@RequestBody Order order) {
        orderService.createOrder(order);
        return "Order created successfully";
    }
}

启动Spring Boot应用程序,使用Postman或其他工具发送POST请求到http://localhost:8080/createOrder,并传入订单信息,验证订单创建和库存扣减是否正常。

五、总结与展望

5.1 方案优势

  • 高性能:Seata的AT模式无侵入性,不需要修改业务代码,同时RabbitMQ的异步消息机制可以提高系统的吞吐量。
  • 高可靠性:通过Seata的事务协调和RabbitMQ的消息确认机制,保证了分布式事务的可靠性和数据的最终一致性。
  • 易于维护:方案的架构清晰,代码结构简单,易于开发和维护。

5.2 可能存在的问题及解决方案

  • 消息积压:如果消息的生产速度大于消费速度,可能会导致消息积压。可以通过增加消费者实例、优化消费逻辑等方式来解决。
  • 消息重复消费:由于网络故障等原因,可能会导致消息重复消费。可以在消费端实现幂等性处理,确保同一消息只被处理一次。

5.3 未来发展趋势

随着分布式系统的不断发展,分布式事务解决方案也将不断完善。未来,Seata可能会支持更多的事务模式和数据库,RabbitMQ也会不断提升性能和可靠性。同时,分布式事务解决方案将与其他技术(如区块链、人工智能等)进行融合,为分布式系统提供更加安全、高效的事务处理能力。

Logo

更多推荐