需求

订单管理系统需要处理大量的订单数据。为了提高用户体验和系统效率,需要实现一个订单超时自动取消的功能。该功能能够在用户下单后一定时间内(例如10分钟),如果用户未完成支付或其他必要的操作,则自动取消订单。

功能实现的关键点

1、消息队列的选择与配置

  • 选择合适的消息队列:根据业务需求选择合适的消息队列系统(如RocketMQ、Kafka等),确保其具备高可用性、高性能和可扩展性。
  • 配置生产者和消费者:正确配置生产者的发送策略和消费者的消费策略,包括重试机制、批量处理、消息确认等。

2、延时消息支持

  • 利用延时消息功能:使用消息队列提供的延时消息功能来延迟处理订单超时逻辑。
  • 合理设置延迟级别:根据业务需求合理设置延迟级别或自定义延迟时间,确保准确触发超时检查。

3、事务一致性

  • 保证数据一致性:确保订单状态更新与消息发送的一致性,避免因网络故障等原因导致的数据不一致问题。
  • 使用分布式事务:可以考虑使用两阶段提交(2PC)、Saga模式或其他分布式事务解决方案来保证事务的原子性。

4、幂等性

  • 防止重复处理:确保消息消费者在处理重复消息时不产生错误结果。
  • 实现幂等性机制:通过唯一标识符(如订单ID)或版本号来判断消息是否已经处理过。

5、监控与报警

  • 实时监控:实现对消息队列和订单系统的实时监控,及时发现异常情况。
  • 报警机制:设置报警机制,以便在出现问题时能够快速响应和解决。

6、日志记录

  • 详细日志:记录关键操作的日志,便于后续排查问题。
  • 集中日志管理:使用统一的日志管理系统,方便集中管理和分析日志数据。

7、性能优化

  • 优化消息生产和消费:优化消息生产和消费逻辑,提高系统的吞吐量和响应速度。
  • 水平扩展:考虑水平扩展方案,以应对高并发场景。

8、容错处理

  • 重试策略:设计合理的重试策略,确保在临时性失败情况下能够自动恢复。
  • 手动干预机制:提供手动干预机制,以便在系统无法自动恢复时人工介入解决问题。

9、安全性

  • 数据加密:确保敏感信息的安全传输和存储。
  • 身份验证和授权:实现身份验证和授权机制,防止未经授权的操作。

10、测试与调试

  • 全面测试:进行全面的单元测试、集成测试和压力测试,确保系统的稳定性。
  • 模拟故障场景:模拟各种网络条件和故障场景,确保系统能够正常应对。

11、资源调度与优化

  • 合理分配资源:合理分配计算资源和网络带宽,确保系统的高效运行。
  • 资源约束:考虑各种资源约束条件,例如内存限制、CPU使用率等。

功能需求

1、订单创建
触发条件:当用户成功提交订单后。
动作:
将订单信息存储到MySQL数据库中。

向RocketMQ发送一条延时消息。

2、订单超时检测
触发条件:RocketMQ消费者接收到延时消息。
动作:
查询MySQL数据库以获取订单状态。

如果订单状态仍为“CREATED”(即用户尚未完成支付或其他必要操作),则将订单状态更新为“CANCELLED”。

3、幂等性保障
实现方式:在订单表中添加messageId字段,用于唯一标识每条消息。
逻辑:
在接收消息时,首先检查messageId是否已经存在于数据库中。

如果存在,说明该消息已经被处理过,直接忽略;否则继续执行取消逻辑。

代码实操

在pom.xml中添加所需的依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

配置RocketMQ和MySQL
在application.yml中添加RocketMQ的配置,并配置MySQL数据库:

server:
  port:8080

rocketmq:
name-server:localhost:9876# 替换为你的NameServer地址
producer:
    group:order-producer-group
consumer:
    group:order-consumer-group

spring:
datasource:
    url:jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC
    username:root
    password:root
    driver-class-name:com.mysql.cj.jdbc.Driver
jpa:
    hibernate:
      ddl-auto:update
    show-sql:true
    properties:
      hibernate:
        format_sql:true

实现消息生产者
创建一个服务类来发送订单创建的消息,并确保幂等性:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
publicclass OrderProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderCreatedMessage(Order order) {
        String topic = "order_created_topic";
        // 发送延时消息,延迟级别为3(默认是1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h)
        int delayLevel = 3; // 延迟10秒
        UUID messageId = UUID.randomUUID(); // 生成唯一的messageId
        order.setMessageId(messageId);
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(order).build(), 3000, delayLevel);
    }
}

实现消息消费者
创建一个监听器来消费订单超时取消的消息,并确保幂等性:

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "order_created_topic", consumerGroup = "${rocketmq.consumer.group}", consumeMode = ConsumeMode.ORDERLY)
publicclass OrderConsumerService implements RocketMQListener<Order> {

    @Autowired
    private OrderRepository orderRepository;

    @Override
    public void onMessage(Order order) {
        System.out.println("Received order: " + order.getId());
        // 检查消息是否已经处理过
        if (orderRepository.findByMessageId(order.getMessageId()).isPresent()) {
            System.out.println("Duplicate message received, skipping processing.");
            return;
        }
        // 模拟订单超时取消逻辑
        cancelOrderIfTimeout(order);
    }

    private void cancelOrderIfTimeout(Order order) {
        Order storedOrder = orderRepository.findById(order.getId()).orElse(null);
        if (storedOrder != null && storedOrder.getStatus().equals("CREATED")) {
            storedOrder.setStatus("CANCELLED");
            orderRepository.save(storedOrder);
            System.out.println("Canceling order due to timeout: " + order.getId());
        }
    }
}

订单实体类
创建一个简单的订单实体类,并使用JPA注解进行持久化:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.UUID;

@Entity
publicclass Order {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String status;
    private UUID messageId; // 唯一标识符,用于幂等性

    // Getters and Setters
    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public UUID getMessageId() {
        return messageId;
    }

    public void setMessageId(UUID messageId) {
        this.messageId = messageId;
    }

    @Override
    public String toString() {
        return"Order{" +
                "id=" + id +
                ", status='" + status + '\'' +
                ", messageId=" + messageId +
                '}';
    }
}

订单仓库接口
创建一个JPA仓库接口来管理订单数据:

import org.springframework.data.jpa.repository.JpaRepository;
import java.util.Optional;
import java.util.UUID;

public interface OrderRepository extends JpaRepository<Order, Long> {
    Optional<Order> findByMessageId(UUID messageId);
}

控制器
创建一个控制器来模拟订单创建和查询订单状态:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/orders")
publicclass OrderController {

    @Autowired
    private OrderProducerService orderProducerService;

    @Autowired
    private OrderRepository orderRepository;

    @PostMapping("/create")
    public String createOrder(@RequestBody Order order) {
        order.setStatus("CREATED");
        Order savedOrder = orderRepository.save(order);
        orderProducerService.sendOrderCreatedMessage(savedOrder);
        return"Order created with ID: " + savedOrder.getId();
    }

    @GetMapping("/{id}")
    public Order getOrderById(@PathVariable Long id) {
        return orderRepository.findById(id).orElse(null);
    }
}

测试结果
在浏览器访问http://localhost:8080/orders/1,你会先看到订单状态为CREATED,然后在10秒后再次访问时,订单状态应变为CANCELLED。

控制台看到输出:

Received order: 1
Canceling order due to timeout: 1
Logo

更多推荐