SpringBoot与RocketMQ整合,实现分布式场景下的订单超时自动取消功能
SpringBoot与RocketMQ整合,实现分布式场景下的订单超时自动取消功能
需求
订单管理系统需要处理大量的订单数据。为了提高用户体验和系统效率,需要实现一个订单超时自动取消的功能。该功能能够在用户下单后一定时间内(例如10分钟),如果用户未完成支付或其他必要的操作,则自动取消订单。
功能实现的关键点
1、消息队列的选择与配置
- 选择合适的消息队列:根据业务需求选择合适的消息队列系统(如RocketMQ、Kafka等),确保其具备高可用性、高性能和可扩展性。
- 配置生产者和消费者:正确配置生产者的发送策略和消费者的消费策略,包括重试机制、批量处理、消息确认等。
2、延时消息支持
- 利用延时消息功能:使用消息队列提供的延时消息功能来延迟处理订单超时逻辑。
- 合理设置延迟级别:根据业务需求合理设置延迟级别或自定义延迟时间,确保准确触发超时检查。
3、事务一致性
- 保证数据一致性:确保订单状态更新与消息发送的一致性,避免因网络故障等原因导致的数据不一致问题。
- 使用分布式事务:可以考虑使用两阶段提交(2PC)、Saga模式或其他分布式事务解决方案来保证事务的原子性。
4、幂等性
- 防止重复处理:确保消息消费者在处理重复消息时不产生错误结果。
- 实现幂等性机制:通过唯一标识符(如订单ID)或版本号来判断消息是否已经处理过。
5、监控与报警
- 实时监控:实现对消息队列和订单系统的实时监控,及时发现异常情况。
- 报警机制:设置报警机制,以便在出现问题时能够快速响应和解决。
6、日志记录
- 详细日志:记录关键操作的日志,便于后续排查问题。
- 集中日志管理:使用统一的日志管理系统,方便集中管理和分析日志数据。
7、性能优化
- 优化消息生产和消费:优化消息生产和消费逻辑,提高系统的吞吐量和响应速度。
- 水平扩展:考虑水平扩展方案,以应对高并发场景。
8、容错处理
- 重试策略:设计合理的重试策略,确保在临时性失败情况下能够自动恢复。
- 手动干预机制:提供手动干预机制,以便在系统无法自动恢复时人工介入解决问题。
9、安全性
- 数据加密:确保敏感信息的安全传输和存储。
- 身份验证和授权:实现身份验证和授权机制,防止未经授权的操作。
10、测试与调试
- 全面测试:进行全面的单元测试、集成测试和压力测试,确保系统的稳定性。
- 模拟故障场景:模拟各种网络条件和故障场景,确保系统能够正常应对。
11、资源调度与优化
- 合理分配资源:合理分配计算资源和网络带宽,确保系统的高效运行。
- 资源约束:考虑各种资源约束条件,例如内存限制、CPU使用率等。
功能需求
1、订单创建
触发条件:当用户成功提交订单后。
动作:
将订单信息存储到MySQL数据库中。
向RocketMQ发送一条延时消息。
2、订单超时检测
触发条件:RocketMQ消费者接收到延时消息。
动作:
查询MySQL数据库以获取订单状态。
如果订单状态仍为“CREATED”(即用户尚未完成支付或其他必要操作),则将订单状态更新为“CANCELLED”。
3、幂等性保障
实现方式:在订单表中添加messageId字段,用于唯一标识每条消息。
逻辑:
在接收消息时,首先检查messageId是否已经存在于数据库中。
如果存在,说明该消息已经被处理过,直接忽略;否则继续执行取消逻辑。
代码实操
在pom.xml中添加所需的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
配置RocketMQ和MySQL
在application.yml中添加RocketMQ的配置,并配置MySQL数据库:
server:
port:8080
rocketmq:
name-server:localhost:9876# 替换为你的NameServer地址
producer:
group:order-producer-group
consumer:
group:order-consumer-group
spring:
datasource:
url:jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC
username:root
password:root
driver-class-name:com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto:update
show-sql:true
properties:
hibernate:
format_sql:true
实现消息生产者
创建一个服务类来发送订单创建的消息,并确保幂等性:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
publicclass OrderProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderCreatedMessage(Order order) {
String topic = "order_created_topic";
// 发送延时消息,延迟级别为3(默认是1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h)
int delayLevel = 3; // 延迟10秒
UUID messageId = UUID.randomUUID(); // 生成唯一的messageId
order.setMessageId(messageId);
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(order).build(), 3000, delayLevel);
}
}
实现消息消费者
创建一个监听器来消费订单超时取消的消息,并确保幂等性:
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "order_created_topic", consumerGroup = "${rocketmq.consumer.group}", consumeMode = ConsumeMode.ORDERLY)
publicclass OrderConsumerService implements RocketMQListener<Order> {
@Autowired
private OrderRepository orderRepository;
@Override
public void onMessage(Order order) {
System.out.println("Received order: " + order.getId());
// 检查消息是否已经处理过
if (orderRepository.findByMessageId(order.getMessageId()).isPresent()) {
System.out.println("Duplicate message received, skipping processing.");
return;
}
// 模拟订单超时取消逻辑
cancelOrderIfTimeout(order);
}
private void cancelOrderIfTimeout(Order order) {
Order storedOrder = orderRepository.findById(order.getId()).orElse(null);
if (storedOrder != null && storedOrder.getStatus().equals("CREATED")) {
storedOrder.setStatus("CANCELLED");
orderRepository.save(storedOrder);
System.out.println("Canceling order due to timeout: " + order.getId());
}
}
}
订单实体类
创建一个简单的订单实体类,并使用JPA注解进行持久化:
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.UUID;
@Entity
publicclass Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String status;
private UUID messageId; // 唯一标识符,用于幂等性
// Getters and Setters
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public UUID getMessageId() {
return messageId;
}
public void setMessageId(UUID messageId) {
this.messageId = messageId;
}
@Override
public String toString() {
return"Order{" +
"id=" + id +
", status='" + status + '\'' +
", messageId=" + messageId +
'}';
}
}
订单仓库接口
创建一个JPA仓库接口来管理订单数据:
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.Optional;
import java.util.UUID;
public interface OrderRepository extends JpaRepository<Order, Long> {
Optional<Order> findByMessageId(UUID messageId);
}
控制器
创建一个控制器来模拟订单创建和查询订单状态:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/orders")
publicclass OrderController {
@Autowired
private OrderProducerService orderProducerService;
@Autowired
private OrderRepository orderRepository;
@PostMapping("/create")
public String createOrder(@RequestBody Order order) {
order.setStatus("CREATED");
Order savedOrder = orderRepository.save(order);
orderProducerService.sendOrderCreatedMessage(savedOrder);
return"Order created with ID: " + savedOrder.getId();
}
@GetMapping("/{id}")
public Order getOrderById(@PathVariable Long id) {
return orderRepository.findById(id).orElse(null);
}
}
测试结果
在浏览器访问http://localhost:8080/orders/1,你会先看到订单状态为CREATED,然后在10秒后再次访问时,订单状态应变为CANCELLED。
控制台看到输出:
Received order: 1
Canceling order due to timeout: 1
更多推荐


所有评论(0)