🌈 我是“没事学AI”,meishixueai, 欢迎咨询、交流,共同学习:
👁️ 【关注】我们一起挖 AI 的各种门道,看看它还有多少新奇玩法等着咱们发现
👍 【点赞】为这些有用的 AI 知识鼓鼓掌,让更多人知道学 AI 也能这么轻松
🔖 【收藏】把这些 AI 小技巧存起来,啥时候想练手了,翻出来就能用
💬 【评论】说说你学 AI 时的想法和疑问,让大家的思路碰出更多火花
👉 关注获取更多AI技术干货,点赞/收藏备用,欢迎评论区交流学习心得! 🚀

一、本地消息表的核心概念与定位

1.1 核心概念

本地消息表是分布式系统中保障跨服务操作最终一致性的一种设计模式。其核心思想是将分布式事务拆分为多个本地事务,通过在发起方的数据库中创建一张消息表,记录事务的执行状态和需要传递的消息内容,再由专门的组件读取消息表中的信息,触发后续的服务调用,从而实现分布式系统中数据的最终一致。

1.2 在技术体系中的定位

在分布式系统架构中,由于服务的拆分,跨服务的业务操作非常普遍,例如电商系统中的下单扣库存操作,涉及订单服务和库存服务。传统的本地事务无法满足跨服务的一致性要求,而分布式事务协议(如2PC、3PC)又存在性能差、可用性低等问题。本地消息表模式作为一种轻量级的最终一致性方案,在实际开发中被广泛应用,它避免了分布式事务协议的复杂性,通过牺牲强一致性换取更好的性能和可用性,适用于对一致性要求不是实时强一致,但需要最终一致的业务场景。

二、本地消息表的运行机制与原理

2.1 运行机制

  1. 业务发起方在执行本地事务时,将需要通知其他服务的消息数据写入本地消息表,这一步操作与本地业务操作在同一个事务中,确保要么本地业务成功且消息表记录成功,要么都失败。
  2. 消息表中每条消息都有一个状态标识(如“待发送”“已发送”“已完成”等),初始状态为“待发送”。
  3. 启动一个消息发送组件(可以是定时任务、消息队列监听等),不断扫描本地消息表中状态为“待发送”的消息。
  4. 消息发送组件将消息发送到接收方服务,接收方服务处理完业务逻辑后,返回处理结果。
  5. 若接收方处理成功,消息发送组件将消息表中对应消息的状态更新为“已完成”;若处理失败,根据重试策略进行重试,若重试多次仍失败,可将消息状态标记为“失败”,等待人工干预。

2.2 内在逻辑

本地消息表模式基于“最终一致性”理论,其内在逻辑是通过将分布式事务转化为一系列可监控、可重试的本地事务,利用消息的可靠传递和重试机制,确保所有相关服务最终都能完成各自的业务操作,从而达到数据一致的状态。它依赖于数据库的事务特性保证消息的可靠写入,依赖消息发送组件的重试机制保证消息的可靠传递,依赖接收方服务的幂等性处理保证重复消息不会导致业务逻辑出错。

三、本地消息表的核心模块及实现

3.1 消息表设计模块

3.1.1 模块描述

消息表是本地消息表模式的核心存储组件,用于记录需要传递的消息信息和事务状态,是整个模式实现的基础。它需要与业务表在同一个数据库中,以便能和业务操作在同一个事务内完成。

3.1.2 原理说明

消息表的设计需要包含足够的信息来描述消息的内容、状态、发送情况等,以便消息发送组件能够准确地读取和处理消息。其中,状态字段是关键,用于跟踪消息的生命周期,确保消息能够被正确发送和处理。

3.1.3 案例与代码实现

案例:在电商下单场景中,订单服务在创建订单的同时,需要向库存服务发送扣减库存的消息,此时订单服务的数据库中需要设计一张订单消息表。

表结构设计代码(MySQL)

CREATE TABLE `order_message` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `order_id` varchar(64) NOT NULL COMMENT '订单ID',
  `product_id` varchar(64) NOT NULL COMMENT '商品ID',
  `quantity` int(11) NOT NULL COMMENT '购买数量',
  `message_content` varchar(2048) NOT NULL COMMENT '消息内容,JSON格式',
  `status` tinyint(4) NOT NULL COMMENT '消息状态:0-待发送,1-已发送,2-已完成,3-失败',
  `retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_status_create_time` (`status`,`create_time`) COMMENT '用于消息发送组件扫描待发送消息'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单消息表,用于记录订单相关的消息'

3.2 本地事务与消息写入模块

3.2.1 模块描述

该模块负责将业务操作和消息写入操作封装在同一个本地事务中,确保业务操作和消息记录的原子性,是保证消息可靠生成的关键环节。

3.2.2 原理说明

利用数据库的ACID特性,将业务逻辑的执行和消息表记录的插入放在同一个事务中。当业务逻辑执行成功后,才会提交事务,此时消息表中才会有对应的记录;如果业务逻辑执行失败或者消息插入失败,事务会回滚,确保不会出现业务操作成功但消息未记录的情况。

3.2.3 案例与代码实现

案例:订单服务创建订单,并在同一事务中写入扣减库存的消息。

代码实现(Java + Spring Boot + MyBatis)

@Service
@Transactional
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private OrderMessageMapper orderMessageMapper;

    /**
     * 创建订单并写入消息
     * @param order 订单信息
     */
    public void createOrder(Order order) {
        // 1. 执行本地业务:创建订单
        orderMapper.insert(order);

        // 2. 构建消息内容
        OrderMessage message = new OrderMessage();
        message.setOrderId(order.getId());
        message.setProductId(order.getProductId());
        message.setQuantity(order.getQuantity());
        message.setMessageContent("{\"orderId\":\"" + order.getId() + "\",\"productId\":\"" + order.getProductId() + "\",\"quantity\":" + order.getQuantity() + "}");
        message.setStatus(0); // 0-待发送
        message.setCreateTime(new Date());
        message.setUpdateTime(new Date());

        // 3. 写入消息表,与创建订单在同一事务中
        orderMessageMapper.insert(message);
    }
}

OrderMapper 接口

public interface OrderMapper {
    @Insert("INSERT INTO `order` (id, product_id, quantity, status, create_time, update_time) " +
            "VALUES (#{id}, #{productId}, #{quantity}, #{status}, #{createTime}, #{updateTime})")
    void insert(Order order);
}

OrderMessageMapper 接口

public interface OrderMessageMapper {
    @Insert("INSERT INTO order_message (order_id, product_id, quantity, message_content, status, retry_count, create_time, update_time) " +
            "VALUES (#{orderId}, #{productId}, #{quantity}, #{messageContent}, #{status}, #{retryCount}, #{createTime}, #{updateTime})")
    void insert(OrderMessage message);
}

3.3 消息发送与重试模块

3.3.1 模块描述

该模块负责从消息表中读取待发送的消息,并将其发送到接收方服务,同时处理发送失败的情况,通过重试机制确保消息能够被成功送达。

3.3.2 原理说明

消息发送组件通过定时任务(如Quartz、Spring的@Scheduled注解等)定期扫描消息表中状态为“待发送”且重试次数未超过阈值的消息。对于每条待发送的消息,调用接收方服务的接口进行消息发送。如果发送成功,更新消息状态为“已完成”;如果发送失败,增加重试次数,若重试次数达到阈值,则更新消息状态为“失败”,否则保持“待发送”状态等待下次重试。

3.3.3 案例与代码实现

案例:订单服务的消息发送组件扫描订单消息表,将待发送的扣减库存消息发送到库存服务。

代码实现(Java + Spring Boot)

@Component
public class MessageSender {

    @Autowired
    private OrderMessageMapper orderMessageMapper;

    @Autowired
    private RestTemplate restTemplate;

    // 定时任务,每10秒执行一次
    @Scheduled(cron = "0/10 * * * * ?")
    public void sendMessages() {
        // 1. 查询待发送的消息(状态为0,重试次数<3)
        List<OrderMessage> messages = orderMessageMapper.selectPendingMessages(0, 3);
        if (messages.isEmpty()) {
            return;
        }

        // 2. 遍历消息并发送
        for (OrderMessage message : messages) {
            try {
                // 调用库存服务的扣减库存接口
                String url = "http://inventory-service/deduct";
                ResponseEntity<String> response = restTemplate.postForEntity(url, message.getMessageContent(), String.class);
                if (response.getStatusCode().is2xxSuccessful()) {
                    // 发送成功,更新消息状态为已完成
                    message.setStatus(2);
                    message.setUpdateTime(new Date());
                    orderMessageMapper.updateStatus(message);
                } else {
                    // 发送失败,重试次数+1
                    handleSendFailure(message);
                }
            } catch (Exception e) {
                // 发生异常,重试次数+1
                handleSendFailure(message);
            }
        }
    }

    /**
     * 处理发送失败的消息
     * @param message 消息对象
     */
    private void handleSendFailure(OrderMessage message) {
        message.setRetryCount(message.getRetryCount() + 1);
        message.setUpdateTime(new Date());
        if (message.getRetryCount() >= 3) {
            // 重试次数达到阈值,标记为失败
            message.setStatus(3);
        }
        orderMessageMapper.updateStatus(message);
    }
}

OrderMessageMapper 补充方法

public interface OrderMessageMapper {
    // 其他方法...

    @Select("SELECT * FROM order_message WHERE status = #{status} AND retry_count < #{maxRetryCount} ORDER BY create_time ASC LIMIT 100")
    List<OrderMessage> selectPendingMessages(@Param("status") int status, @Param("maxRetryCount") int maxRetryCount);

    @Update("UPDATE order_message SET status = #{status}, retry_count = #{retryCount}, update_time = #{updateTime} WHERE id = #{id}")
    void updateStatus(OrderMessage message);
}

3.4 接收方消息处理与幂等性保障模块

3.4.1 模块描述

该模块负责接收发送方的消息,执行对应的业务逻辑,并确保在消息重复发送的情况下,业务逻辑不会被重复执行,即保证幂等性。

3.4.2 原理说明

接收方服务在接收到消息后,首先需要对消息进行校验,判断是否已经处理过该消息。可以通过在本地数据库中记录已处理的消息ID,或者使用分布式锁等方式来实现幂等性。如果消息未被处理,则执行相应的业务逻辑;如果消息已被处理,则直接返回成功,避免重复处理。

3.4.3 案例与代码实现

案例:库存服务接收订单服务发送的扣减库存消息,处理扣减库存逻辑,并保证幂等性。

代码实现(Java + Spring Boot + MyBatis)

@RestController
@RequestMapping("/deduct")
public class InventoryController {

    @Autowired
    private InventoryService inventoryService;

    @PostMapping
    public String deductInventory(@RequestBody String messageContent) {
        // 解析消息内容,获取订单ID等信息
        JSONObject jsonObject = JSONObject.parseObject(messageContent);
        String orderId = jsonObject.getString("orderId");
        String productId = jsonObject.getString("productId");
        int quantity = jsonObject.getIntValue("quantity");

        // 处理扣减库存逻辑
        boolean result = inventoryService.deduct(productId, quantity, orderId);
        return result ? "success" : "failure";
    }
}

@Service
public class InventoryService {

    @Autowired
    private InventoryMapper inventoryMapper;

    @Autowired
    private ProcessedMessageMapper processedMessageMapper;

    @Transactional
    public boolean deduct(String productId, int quantity, String orderId) {
        // 1. 检查消息是否已处理(幂等性保障)
        ProcessedMessage processedMessage = processedMessageMapper.selectByMessageId(orderId);
        if (processedMessage != null) {
            // 消息已处理,直接返回成功
            return true;
        }

        // 2. 执行扣减库存操作
        Inventory inventory = inventoryMapper.selectByProductId(productId);
        if (inventory == null || inventory.getStock() < quantity) {
            // 库存不足,返回失败
            return false;
        }
        inventory.setStock(inventory.getStock() - quantity);
        inventory.setUpdateTime(new Date());
        inventoryMapper.update(inventory);

        // 3. 记录已处理的消息
        ProcessedMessage message = new ProcessedMessage();
        message.setMessageId(orderId);
        message.setProcessTime(new Date());
        processedMessageMapper.insert(message);

        return true;
    }
}

ProcessedMessage 表结构(MySQL)

CREATE TABLE `processed_message` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `message_id` varchar(64) NOT NULL COMMENT '消息ID(如订单ID)',
  `process_time` datetime NOT NULL COMMENT '处理时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_message_id` (`message_id`) COMMENT '确保消息ID唯一,用于幂等性检查'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='已处理消息表,用于幂等性保障'

InventoryMapper 接口

public interface InventoryMapper {
    @Select("SELECT * FROM inventory WHERE product_id = #{productId}")
    Inventory selectByProductId(String productId);

    @Update("UPDATE inventory SET stock = #{stock}, update_time = #{updateTime} WHERE product_id = #{productId}")
    void update(Inventory inventory);
}

ProcessedMessageMapper 接口

public interface ProcessedMessageMapper {
    @Select("SELECT * FROM processed_message WHERE message_id = #{messageId}")
    ProcessedMessage selectByMessageId(String messageId);

    @Insert("INSERT INTO processed_message (message_id, process_time) VALUES (#{messageId}, #{processTime})")
    void insert(ProcessedMessage message);
}
Logo

更多推荐