分布式事务入门3——使用本地消息表+RabbitMQ实现分布式事务
·
本地消息表+可靠消息队列
核心思想:利用可靠消息队列把服务调用改为异步消息交互,利用本地消息表保持一致性
处理过程如下:主服务使用本地消息表记录待处理的事务,本地消息状态置为待发送;主服务定时扫描待处理事务,分配给线程处理,本地消息状态置为发送中;主服务使用可靠消息队列(如rabbitmq)发送消息,本地消息状态置为已发送。被调用的服务订阅事务消息并处理消息,发送处理结果消息。主服务订阅处理结果消息,并更新本地消息状态为处理成功或处理失败;对于处理失败的消息同时发送一条延迟处理消息,收到延迟处理消息后更新为关闭/已取消状态。
这种异步处理模式收到请求后只管更新当前状态后马上返回,性能极佳。特别适用于对可用性要求极高、并发极高,且只需保持最终一致性的场景,如秒杀。
关键代码如下:
本地消息表:
package com.decotest.orderservice.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
@Entity
@Table(name="local_message")
@Data
public class Local_Message {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name="message_id", nullable=false, unique=true)
private String messageId;
@Column(name="business_key", nullable=false)
private String businessKey;
@Column(name="business_type", nullable=false)
private String businessType;
@Column(name="content", nullable=false)
private String content;
@Column(name="status", nullable=false)
private int status; //0-待发送 1-已发送 2-已处理成功 3-已处理失败 4-发送处理中
@Column(name="retry_count")
private int retryCount;
@Column(name="next_retry_time")
private LocalDateTime nextRetryTime;
@Column(name="created_time")
private LocalDateTime createdTime;
@Column(name="updated_time")
private LocalDateTime lastUpdateTime;
@PrePersist
public void onCreate() {
this.createdTime = LocalDateTime.now();
}
@PreUpdate
public void onUpdate() {
this.lastUpdateTime = LocalDateTime.now();
}
}
Resposityory类:
package com.decotest.orderservice.repository;
import com.decotest.orderservice.entity.Local_Message;
import net.bytebuddy.asm.Advice;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
@Repository
public interface LocalMessageRespository extends JpaRepository<Local_Message,Long>
{
Optional<Local_Message> findByBusinessKey(String businessKey);
@Query("select lm from Local_Message lm where lm.status=0 and lm.retryCount < 5")
List<Local_Message> findPendingMessage();
@Modifying
@Transactional
@Query("update Local_Message m set m.status=4, m.lastUpdateTime=CURRENT TIMESTAMP where m.id=?1")
int updateToSending(long id);
@Modifying
@Transactional
@Query("update Local_Message m set m.status=1, m.lastUpdateTime=CURRENT TIMESTAMP where m.id=?1")
int updateToSent(long id);
}
线程池:
package com.decotest.orderservice.service.multit_thread;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
private static final int THREAD_COUNT = 8;
@Bean
public ExecutorService[] threadPools() {
ExecutorService[] pools = new ExecutorService[THREAD_COUNT];
for(int i=0; i<THREAD_COUNT; i++){
int finalI = i;
pools[i]= Executors.newSingleThreadExecutor(r->
new Thread(r, "msg-processor-"+ finalI)
);
}
return pools;
}
}
order-service队列设置:
package com.decotest.orderservice.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
// Order-Service配置
@Configuration
public class OrderRabbitMQConfig {
// 订单支付交换机
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange", true, false);
}
// 延迟交换机
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange("order.delay.exchange", true, false);
}
// 支付队列
@Bean
public Queue orderPayQueue() {
return new Queue("order.pay.queue", true);
}
// 延迟队列(使用死信队列实现延迟)
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "order.delay.process.exchange");
args.put("x-dead-letter-routing-key", "order.delay.process");
args.put("x-message-ttl", 1 * 60 * 1000); // 5分钟
return QueueBuilder.durable("order.delay.queue")
.withArguments(args)
.build();
}
// 延迟处理交换机
@Bean
public DirectExchange orderDelayProcessExchange() {
return new DirectExchange("order.delay.process.exchange", true, false);
}
// 延迟处理队列
@Bean
public Queue orderDelayProcessQueue() {
return new Queue("order.delay.process.queue", true);
}
// 绑定关系
@Bean
public Binding bindingOrderPay() {
return BindingBuilder.bind(orderPayQueue())
.to(orderExchange())
.with("order.pay");
}
@Bean
public Binding bindingOrderDelay() {
return BindingBuilder.bind(orderDelayQueue())
.to(orderDelayExchange())
.with("order.delay");
}
@Bean
public Binding bindingOrderDelayProcess() {
return BindingBuilder.bind(orderDelayProcessQueue())
.to(orderDelayProcessExchange())
.with("order.delay.process");
}
@Bean
public RabbitAdmin rabbitAdmin(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
// 确保RabbitAdmin能自动处理本配置类中声明的组件
admin.setAutoStartup(true);
return admin;
}
}
多线程发送消息:
package com.decotest.orderservice.service.multit_thread;
import com.decotest.orderservice.entity.Local_Message;
import com.decotest.orderservice.repository.LocalMessageRespository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@Service
@Slf4j
@RequiredArgsConstructor
public class MultiThreadService {
private final LocalMessageRespository localMessageRespository;
private final RabbitTemplate rabbitTemplate;
private final ExecutorService[] threadPools;
@Scheduled(fixedRate = 1000) //500毫秒扫描一遍
public void scanAndSendMessage(){
List<Local_Message> messages = localMessageRespository.findPendingMessage();
log.info("扫描pending消息数量:{}", messages.size());
messages.forEach(message -> {
int updateCnt = localMessageRespository.updateToSending(message.getId());
if(updateCnt > 0){
int index = (int)(message.getId() % threadPools.length);
log.info("异步消息更新为待发送 order_sn={}, index={}", message.getBusinessKey(), index);
threadPools[index].submit(() -> {
sendPayMessage(message.getId());
});
}
});
}
private void sendPayMessage(long messageId){
Optional<Local_Message> localMessageOptional = localMessageRespository.findById(messageId);
if(localMessageOptional.isPresent()){
Local_Message localMessage = localMessageOptional.get();
rabbitTemplate.convertAndSend("order.exchange", "order.pay", localMessage.getContent());
log.info("异步消息已发送 order_sn={}", localMessage.getBusinessKey());
int updateCnt = localMessageRespository.updateToSent(messageId);
if(updateCnt <= 0){
log.error("异步消息更新已发送更新失败 order_sn={}", localMessage.getBusinessKey());
}
}else{
log.error("找不到本地异步消息,messageId={}", messageId);
}
}
}
order-service消息监听:
package com.decotest.orderservice.listeners;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.decotest.orderservice.entity.dto.DelayMessage;
import com.decotest.orderservice.entity.dto.PayResultMessage;
import com.decotest.orderservice.service.OrderService;
import com.decotest.orderservice.service.local_message.OrderLMService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PayResultListener {
@Autowired
private OrderLMService orderLMService;
@RabbitListener(queues = "pay.result.queue")
public void onPayResult(String message){
try {
PayResultMessage payResultMessage = JSON.parseObject(message, PayResultMessage.class);
orderLMService.handlePayResultMessage(payResultMessage);
} catch (Exception e) {
log.error("解析支付结果消息失败,message:{}",message);
}
}
@RabbitListener(queues = "order.delay.process.queue")
public void onOrderDelayProcess(String message){
try{
DelayMessage delayMessage = JSON.parseObject(message, DelayMessage.class);
orderLMService.handleOrderDelayMessage(delayMessage);
}catch (Exception e){
log.error("解析延迟报文出错,message:{}",message);
}
}
}
order-service主要服务:
package com.decotest.orderservice.service.local_message;
import com.alibaba.fastjson.JSON;
import com.decotest.orderservice.common.ApiResponse;
import com.decotest.orderservice.entity.Goods;
import com.decotest.orderservice.entity.Local_Message;
import com.decotest.orderservice.entity.Order;
import com.decotest.orderservice.entity.dto.DelayMessage;
import com.decotest.orderservice.entity.dto.PayMessage;
import com.decotest.orderservice.entity.dto.PayResultMessage;
import com.decotest.orderservice.repository.GoodsRepository;
import com.decotest.orderservice.repository.LocalMessageRespository;
import com.decotest.orderservice.repository.OrderRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.swing.text.html.Option;
import java.math.BigDecimal;
import java.util.Optional;
import java.util.UUID;
@Service
@Slf4j
public class OrderLMService {
@Autowired
private GoodsRepository goodsRepos;
@Autowired
private OrderRepository orderRepository;
@Autowired
private LocalMessageRespository localMessageRespository;
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void handleOrderDelayMessage(DelayMessage delayMessage) {
String order_sn = delayMessage.getOrderSn();
log.info("处理延期通知消息,order_sn={}",order_sn);
Optional<Order> optionalOrder = orderRepository.findByOrderSn(order_sn);
if(optionalOrder.isPresent()){
Order order = optionalOrder.get();
char orderStatus = order.getStatus();
if(orderStatus!='1'){ //非终态
order.setStatus('3'); //自动关闭(过期)
orderRepository.save(order);
}
long goodsId = order.getGoodsId();
Goods goods = goodsRepos.findById(goodsId);
if(goods != null){
goods.setFrozen_stock(goods.getFrozen_stock()-1);
goods.setStock(goods.getStock()+1);
goodsRepos.save(goods);
}
}
}
@Transactional
public void handlePayResultMessage(PayResultMessage payResultMessage){
String order_sn = payResultMessage.getOrderSn();
int status = payResultMessage.getStatus();
log.info("处理支付结果消息,order_sn={}",order_sn);
Optional<Local_Message> optionalLocalMessage = localMessageRespository.findByBusinessKey(order_sn);
if(optionalLocalMessage.isPresent()){
Local_Message localMessage = optionalLocalMessage.get();
if(status==0){
localMessage.setStatus(2); //处理完成-成功
}else if(status==1){
localMessage.setStatus(3); //处理完成-失败
}
localMessageRespository.save(localMessage);
}else{
log.error("handlePayResultMessage.未找到本地消息,order_sn={}",order_sn);
}
Optional<Order> optionalOrder = orderRepository.findByOrderSn(order_sn);
if(optionalOrder.isPresent()){
Order order = optionalOrder.get();
if(status==0){
order.setStatus('1'); //支付成功
Goods goods = goodsRepos.findById(order.getGoodsId());
goods.setFrozen_stock(goods.getFrozen_stock()-1);
goodsRepos.save(goods);
}else{
order.setStatus('2'); //支付失败
sendDelayMessage(order_sn);
}
orderRepository.save(order);
}else{
log.error("handlePayResultMessage.未找到Order,order_sn={}",order_sn);
}
}
@Transactional
public ApiResponse<?> createOrder(long userId, long goodsId){
log.info("createOrder 开始, userId={}, goodsId={}", userId, goodsId);
Optional<Goods> goodsOptional = goodsRepos.findByIdForUpdate(goodsId);
if(goodsOptional.isPresent()){
Goods goods = goodsOptional.get();
if(goods.getStock()<1){
return ApiResponse.badRequest("库存不足");
}
BigDecimal price = goods.getPrice();
String order_sn = UUID.randomUUID().toString();
Order order = new Order();
order.setOrderSn(order_sn);
order.setStatus('0'); //0-待支付
order.setGoodsId(goodsId);
order.setUserid(userId);
orderRepository.save(order);
goods.setStock(goods.getStock()-1);
goods.setFrozen_stock(goods.getFrozen_stock()+1);
goodsRepos.save(goods);
createLocalMessage(order_sn, userId, price);
//由异步任务发送,提升处理效率
/*
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit(){
sendPayMessage(order_sn, userId, price);
}
});
*/
log.info("订单创建成功, order_sn={}", order_sn);
return ApiResponse.success("已提交成功,请等待");
}else{
return ApiResponse.badRequest("商品不存在");
}
}
private void createLocalMessage(String order_sn, long userId, BigDecimal amount){
PayMessage payMessage = new PayMessage();
payMessage.setOrderSn(order_sn);
payMessage.setUserId(userId);
payMessage.setAmount(amount);
payMessage.setTimestamp(System.currentTimeMillis());
Local_Message localMessage = new Local_Message();
localMessage.setMessageId(UUID.randomUUID().toString());
localMessage.setBusinessKey(order_sn);
localMessage.setContent(JSON.toJSONString(payMessage));
localMessage.setStatus(0); //0-待发送
localMessage.setRetryCount(0);
localMessage.setBusinessType("pay"); //pay-支付
localMessageRespository.save(localMessage);
}
private void sendPayMessage(String order_sn, long userId, BigDecimal amount){
Optional<Local_Message> localMessageOptional = localMessageRespository.findByBusinessKey(order_sn);
if(localMessageOptional.isPresent()){
Local_Message localMessage = localMessageOptional.get();
rabbitTemplate.convertAndSend("order.exchange", "order.pay", localMessage.getContent());
log.info("支付消息已发送 order_sn={}", order_sn);
localMessage.setStatus(1); //已发送
localMessageRespository.save(localMessage);
}else{
log.error("找不到本地消息,order_sn={}", order_sn);
}
}
private void sendDelayMessage(String order_sn){
DelayMessage delayMessage = new DelayMessage();
delayMessage.setOrderSn(order_sn);
delayMessage.setType('0'); //0-PAY-CHECK
rabbitTemplate.convertAndSend("order.delay.exchange", "order.delay", JSON.toJSONString(delayMessage));
log.info("延迟消息已发送 order_sn={}", order_sn);
}
}
user-service队列设置:
package com.decotest.userservice.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
// User-Service配置
@Configuration
public class UserRabbitMQConfig {
// 支付结果交换机
@Bean
public DirectExchange payResultExchange() {
return new DirectExchange("pay.result.exchange", true, false);
}
// 支付结果队列
@Bean
public Queue payResultQueue() {
Queue queue = new Queue("pay.result.queue", true);
return queue;
}
@Bean
public Binding bindingPayResult() {
return BindingBuilder.bind(payResultQueue())
.to(payResultExchange())
.with("pay.result");
}
@Bean
public RabbitAdmin rabbitAdmin(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
// 确保RabbitAdmin能自动处理本配置类中声明的组件
admin.setAutoStartup(true);
return admin;
}
}
user-service消息监听:
package com.decotest.userservice.lsn;
import com.decotest.userservice.entity.dto.PayMessage;
import com.alibaba.fastjson.JSON;
import com.decotest.userservice.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
// User-Service 消息监听器
@Component
public class UserMessageListener {
@Autowired
private UserService userService;
@RabbitListener(queues = "order.pay.queue")
public void handlePayMessage(String message) {
try {
PayMessage payMessage = JSON.parseObject(message, PayMessage.class);
userService.handlePayMessage(payMessage);
} catch (Exception e) {
log.error("处理支付消息失败", e);
}
}
}
user-service主要服务:
package com.decotest.userservice.service;
import com.alibaba.fastjson.JSON;
import com.decotest.userservice.common.ApiResponse;
import com.decotest.userservice.entity.Pay;
import com.decotest.userservice.entity.User;
import com.decotest.userservice.entity.dto.*;
import com.decotest.userservice.repository.PayRepository;
import com.decotest.userservice.repository.UserRepository;
import io.seata.core.context.RootContext;
import jakarta.transaction.Transactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import java.util.Optional;
@Slf4j
@Service
public class UserService {
@Autowired
private UserRepository userRepos;
@Autowired
private PayRepository payRepos;
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void handlePayMessage(PayMessage payMessage) {
String order_sn = payMessage.getOrderSn();
log.info("handlePayMessage()处理支付消息,order_sn:{}", order_sn);
if(payRepos.existsByXid(order_sn)){
log.info("handlePayMessage 该笔订单已支付,order_sn:{}", order_sn);
return;
}
long userId = payMessage.getUserId();
User user=userRepos.findUserById(userId);
if(user==null){
log.error("handlePayMessage user not found,userId:{}",userId);
sendPayResult(order_sn, 1, "用户不存在");
return;
}
BigDecimal amount = payMessage.getAmount();
if(user.getBalance().compareTo(amount)<=0){
log.error("handlePayMessage balance not enough,userId:{}",userId);
sendPayResult(order_sn, 1, "余额不足");
return;
}
user.setBalance(user.getBalance().subtract(amount));
userRepos.save(user);
Pay pay=new Pay();
pay.setXid(order_sn);
pay.setAmount(amount);
pay.setStatus('1');
pay.setUserid(userId);
pay.setTrantype('1');
payRepos.save(pay);
sendPayResult(order_sn, 0, "");
}
/**
* 发送支付结果
*/
//status:0-success, 1:fail
private void sendPayResult(String orderSn, int status, String errorMsg) {
PayResultMessage result = new PayResultMessage();
result.setOrderSn(orderSn);
result.setStatus(status);
result.setErrorMessage(errorMsg);
result.setProcessTime(LocalDateTime.now());
rabbitTemplate.convertAndSend(
"pay.result.exchange",
"pay.result",
JSON.toJSONString(result)
);
log.info("发送支付结果: {} - {}", orderSn, status);
}
}
更多推荐

所有评论(0)