本地消息表+可靠消息队列

核心思想:利用可靠消息队列把服务调用改为异步消息交互,利用本地消息表保持一致性

处理过程如下:主服务使用本地消息表记录待处理的事务,本地消息状态置为待发送;主服务定时扫描待处理事务,分配给线程处理,本地消息状态置为发送中;主服务使用可靠消息队列(如rabbitmq)发送消息,本地消息状态置为已发送。被调用的服务订阅事务消息并处理消息,发送处理结果消息。主服务订阅处理结果消息,并更新本地消息状态为处理成功或处理失败;对于处理失败的消息同时发送一条延迟处理消息,收到延迟处理消息后更新为关闭/已取消状态。

这种异步处理模式收到请求后只管更新当前状态后马上返回,性能极佳。特别适用于对可用性要求极高、并发极高,且只需保持最终一致性的场景,如秒杀。

关键代码如下:

本地消息表:

package com.decotest.orderservice.entity;

import jakarta.persistence.*;
import lombok.Data;

import java.time.LocalDateTime;

@Entity
@Table(name="local_message")
@Data
public class Local_Message {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name="message_id", nullable=false, unique=true)
    private String messageId;

    @Column(name="business_key", nullable=false)
    private String businessKey;

    @Column(name="business_type", nullable=false)
    private String businessType;

    @Column(name="content", nullable=false)
    private String content;

    @Column(name="status", nullable=false)
    private int status;     //0-待发送 1-已发送 2-已处理成功 3-已处理失败 4-发送处理中

    @Column(name="retry_count")
    private int retryCount;

    @Column(name="next_retry_time")
    private LocalDateTime nextRetryTime;

    @Column(name="created_time")
    private LocalDateTime createdTime;

    @Column(name="updated_time")
    private LocalDateTime lastUpdateTime;

    @PrePersist
    public void onCreate() {
        this.createdTime = LocalDateTime.now();
    }
    @PreUpdate
    public void onUpdate() {
        this.lastUpdateTime = LocalDateTime.now();
    }
}

Resposityory类:

package com.decotest.orderservice.repository;

import com.decotest.orderservice.entity.Local_Message;
import net.bytebuddy.asm.Advice;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Optional;

@Repository
public interface LocalMessageRespository extends JpaRepository<Local_Message,Long>
{
    Optional<Local_Message> findByBusinessKey(String businessKey);

    @Query("select lm from Local_Message lm where lm.status=0 and lm.retryCount < 5")
    List<Local_Message> findPendingMessage();

    @Modifying
    @Transactional
    @Query("update Local_Message m set m.status=4, m.lastUpdateTime=CURRENT TIMESTAMP where m.id=?1")
    int updateToSending(long id);


    @Modifying
    @Transactional
    @Query("update Local_Message m set m.status=1, m.lastUpdateTime=CURRENT TIMESTAMP where m.id=?1")
    int updateToSent(long id);

}

线程池:

package com.decotest.orderservice.service.multit_thread;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
@EnableAsync
public class ThreadPoolConfig {
    private static final int THREAD_COUNT = 8;

    @Bean
    public ExecutorService[] threadPools() {
        ExecutorService[] pools = new ExecutorService[THREAD_COUNT];

        for(int i=0; i<THREAD_COUNT; i++){
            int finalI = i;
            pools[i]= Executors.newSingleThreadExecutor(r->
                    new Thread(r, "msg-processor-"+ finalI)
            );
        }

        return pools;
    }
}

order-service队列设置:

package com.decotest.orderservice.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

// Order-Service配置
@Configuration
public class OrderRabbitMQConfig {

    // 订单支付交换机
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange", true, false);
    }

    // 延迟交换机
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange("order.delay.exchange", true, false);
    }

    // 支付队列
    @Bean
    public Queue orderPayQueue() {
        return new Queue("order.pay.queue", true);
    }

    // 延迟队列(使用死信队列实现延迟)
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "order.delay.process.exchange");
        args.put("x-dead-letter-routing-key", "order.delay.process");
        args.put("x-message-ttl", 1 * 60 * 1000); // 5分钟
        return QueueBuilder.durable("order.delay.queue")
                .withArguments(args)
                .build();
    }

    // 延迟处理交换机
    @Bean
    public DirectExchange orderDelayProcessExchange() {
        return new DirectExchange("order.delay.process.exchange", true, false);
    }

    // 延迟处理队列
    @Bean
    public Queue orderDelayProcessQueue() {
        return new Queue("order.delay.process.queue", true);
    }

    // 绑定关系
    @Bean
    public Binding bindingOrderPay() {
        return BindingBuilder.bind(orderPayQueue())
                .to(orderExchange())
                .with("order.pay");
    }

    @Bean
    public Binding bindingOrderDelay() {
        return BindingBuilder.bind(orderDelayQueue())
                .to(orderDelayExchange())
                .with("order.delay");
    }

    @Bean
    public Binding bindingOrderDelayProcess() {
        return BindingBuilder.bind(orderDelayProcessQueue())
                .to(orderDelayProcessExchange())
                .with("order.delay.process");
    }

    @Bean
    public RabbitAdmin rabbitAdmin(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        // 确保RabbitAdmin能自动处理本配置类中声明的组件
        admin.setAutoStartup(true);
        return admin;
    }
}

多线程发送消息:

package com.decotest.orderservice.service.multit_thread;

import com.decotest.orderservice.entity.Local_Message;
import com.decotest.orderservice.repository.LocalMessageRespository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

@Service
@Slf4j
@RequiredArgsConstructor
public class MultiThreadService {
    private final LocalMessageRespository localMessageRespository;
    private final RabbitTemplate rabbitTemplate;
    private final ExecutorService[] threadPools;

    @Scheduled(fixedRate = 1000) //500毫秒扫描一遍
    public void scanAndSendMessage(){
        List<Local_Message> messages = localMessageRespository.findPendingMessage();
        log.info("扫描pending消息数量:{}", messages.size());
        messages.forEach(message -> {
            int updateCnt = localMessageRespository.updateToSending(message.getId());
            if(updateCnt > 0){
                int index = (int)(message.getId() % threadPools.length);
                log.info("异步消息更新为待发送 order_sn={}, index={}", message.getBusinessKey(), index);
                threadPools[index].submit(() -> {
                   sendPayMessage(message.getId());
                });
            }
        });
    }

    private void sendPayMessage(long messageId){

        Optional<Local_Message> localMessageOptional = localMessageRespository.findById(messageId);

        if(localMessageOptional.isPresent()){
            Local_Message localMessage = localMessageOptional.get();

            rabbitTemplate.convertAndSend("order.exchange", "order.pay", localMessage.getContent());
            log.info("异步消息已发送 order_sn={}", localMessage.getBusinessKey());

            int updateCnt = localMessageRespository.updateToSent(messageId);
            if(updateCnt <= 0){
                log.error("异步消息更新已发送更新失败 order_sn={}", localMessage.getBusinessKey());
            }
        }else{
            log.error("找不到本地异步消息,messageId={}", messageId);
        }
    }
}

order-service消息监听:

package com.decotest.orderservice.listeners;

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.decotest.orderservice.entity.dto.DelayMessage;
import com.decotest.orderservice.entity.dto.PayResultMessage;
import com.decotest.orderservice.service.OrderService;
import com.decotest.orderservice.service.local_message.OrderLMService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class PayResultListener {
    @Autowired
    private OrderLMService orderLMService;

    @RabbitListener(queues = "pay.result.queue")
    public void onPayResult(String message){
        try {
            PayResultMessage payResultMessage = JSON.parseObject(message, PayResultMessage.class);
            orderLMService.handlePayResultMessage(payResultMessage);

        } catch (Exception e) {
            log.error("解析支付结果消息失败,message:{}",message);
        }
    }

    @RabbitListener(queues = "order.delay.process.queue")
    public void onOrderDelayProcess(String message){
        try{
            DelayMessage delayMessage = JSON.parseObject(message, DelayMessage.class);
            orderLMService.handleOrderDelayMessage(delayMessage);
        }catch (Exception e){
            log.error("解析延迟报文出错,message:{}",message);
        }
    }
}

order-service主要服务:

package com.decotest.orderservice.service.local_message;

import com.alibaba.fastjson.JSON;
import com.decotest.orderservice.common.ApiResponse;
import com.decotest.orderservice.entity.Goods;
import com.decotest.orderservice.entity.Local_Message;
import com.decotest.orderservice.entity.Order;
import com.decotest.orderservice.entity.dto.DelayMessage;
import com.decotest.orderservice.entity.dto.PayMessage;
import com.decotest.orderservice.entity.dto.PayResultMessage;
import com.decotest.orderservice.repository.GoodsRepository;
import com.decotest.orderservice.repository.LocalMessageRespository;
import com.decotest.orderservice.repository.OrderRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import javax.swing.text.html.Option;
import java.math.BigDecimal;
import java.util.Optional;
import java.util.UUID;

@Service
@Slf4j
public class OrderLMService {
    @Autowired
    private GoodsRepository goodsRepos;

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private LocalMessageRespository localMessageRespository;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void handleOrderDelayMessage(DelayMessage delayMessage) {
        String order_sn = delayMessage.getOrderSn();

        log.info("处理延期通知消息,order_sn={}",order_sn);

        Optional<Order> optionalOrder = orderRepository.findByOrderSn(order_sn);
        if(optionalOrder.isPresent()){
            Order order = optionalOrder.get();
            char orderStatus = order.getStatus();


            if(orderStatus!='1'){  //非终态
                order.setStatus('3');   //自动关闭(过期)
                orderRepository.save(order);
            }

            long goodsId = order.getGoodsId();

            Goods goods = goodsRepos.findById(goodsId);
            if(goods != null){
                goods.setFrozen_stock(goods.getFrozen_stock()-1);
                goods.setStock(goods.getStock()+1);
                goodsRepos.save(goods);
            }
        }

    }

    @Transactional
    public void handlePayResultMessage(PayResultMessage  payResultMessage){
        String order_sn = payResultMessage.getOrderSn();
        int status = payResultMessage.getStatus();

        log.info("处理支付结果消息,order_sn={}",order_sn);

        Optional<Local_Message> optionalLocalMessage = localMessageRespository.findByBusinessKey(order_sn);

        if(optionalLocalMessage.isPresent()){
            Local_Message localMessage = optionalLocalMessage.get();
            if(status==0){
                localMessage.setStatus(2);  //处理完成-成功
            }else if(status==1){
                localMessage.setStatus(3);  //处理完成-失败
            }
            localMessageRespository.save(localMessage);
        }else{
            log.error("handlePayResultMessage.未找到本地消息,order_sn={}",order_sn);
        }

        Optional<Order> optionalOrder = orderRepository.findByOrderSn(order_sn);
        if(optionalOrder.isPresent()){
            Order order = optionalOrder.get();

            if(status==0){
                order.setStatus('1'); //支付成功
                Goods goods = goodsRepos.findById(order.getGoodsId());
                goods.setFrozen_stock(goods.getFrozen_stock()-1);
                goodsRepos.save(goods);
            }else{
                order.setStatus('2'); //支付失败
                sendDelayMessage(order_sn);
            }
            orderRepository.save(order);
        }else{
            log.error("handlePayResultMessage.未找到Order,order_sn={}",order_sn);
        }
    }

    @Transactional
    public ApiResponse<?> createOrder(long userId, long goodsId){

        log.info("createOrder 开始, userId={}, goodsId={}", userId, goodsId);

        Optional<Goods> goodsOptional = goodsRepos.findByIdForUpdate(goodsId);
        if(goodsOptional.isPresent()){
            Goods goods = goodsOptional.get();

            if(goods.getStock()<1){
                return ApiResponse.badRequest("库存不足");
            }

            BigDecimal price = goods.getPrice();

            String order_sn = UUID.randomUUID().toString();
            Order order = new Order();
            order.setOrderSn(order_sn);
            order.setStatus('0'); //0-待支付
            order.setGoodsId(goodsId);
            order.setUserid(userId);

            orderRepository.save(order);

            goods.setStock(goods.getStock()-1);
            goods.setFrozen_stock(goods.getFrozen_stock()+1);
            goodsRepos.save(goods);

            createLocalMessage(order_sn, userId, price);

            //由异步任务发送,提升处理效率
            /*
            TransactionSynchronizationManager.registerSynchronization(
                    new TransactionSynchronization() {
                    @Override
                    public void afterCommit(){
                        sendPayMessage(order_sn, userId, price);
                    }
            });
             */

            log.info("订单创建成功, order_sn={}", order_sn);

            return ApiResponse.success("已提交成功,请等待");
        }else{
            return ApiResponse.badRequest("商品不存在");
        }
    }

    private void createLocalMessage(String order_sn, long userId, BigDecimal amount){
        PayMessage payMessage = new PayMessage();
        payMessage.setOrderSn(order_sn);
        payMessage.setUserId(userId);
        payMessage.setAmount(amount);
        payMessage.setTimestamp(System.currentTimeMillis());

        Local_Message localMessage = new Local_Message();
        localMessage.setMessageId(UUID.randomUUID().toString());
        localMessage.setBusinessKey(order_sn);
        localMessage.setContent(JSON.toJSONString(payMessage));
        localMessage.setStatus(0);    //0-待发送
        localMessage.setRetryCount(0);
        localMessage.setBusinessType("pay");  //pay-支付
        localMessageRespository.save(localMessage);
    }

    private void sendPayMessage(String order_sn, long userId, BigDecimal amount){

        Optional<Local_Message> localMessageOptional = localMessageRespository.findByBusinessKey(order_sn);

        if(localMessageOptional.isPresent()){
            Local_Message localMessage = localMessageOptional.get();

            rabbitTemplate.convertAndSend("order.exchange", "order.pay", localMessage.getContent());
            log.info("支付消息已发送 order_sn={}", order_sn);
            localMessage.setStatus(1);  //已发送
            localMessageRespository.save(localMessage);
        }else{
            log.error("找不到本地消息,order_sn={}", order_sn);
        }
    }

    private void sendDelayMessage(String order_sn){
        DelayMessage delayMessage = new DelayMessage();
        delayMessage.setOrderSn(order_sn);
        delayMessage.setType('0');  //0-PAY-CHECK
        rabbitTemplate.convertAndSend("order.delay.exchange", "order.delay", JSON.toJSONString(delayMessage));
        log.info("延迟消息已发送 order_sn={}", order_sn);
    }


}

user-service队列设置:

package com.decotest.userservice.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

// User-Service配置
@Configuration
public class UserRabbitMQConfig {
    // 支付结果交换机
    @Bean
    public DirectExchange payResultExchange() {
        return new DirectExchange("pay.result.exchange", true, false);
    }

    // 支付结果队列
    @Bean
    public Queue payResultQueue() {
        Queue queue = new Queue("pay.result.queue", true);
        return queue;
    }


    @Bean
    public Binding bindingPayResult() {
        return BindingBuilder.bind(payResultQueue())
                .to(payResultExchange())
                .with("pay.result");
    }

    @Bean
    public RabbitAdmin rabbitAdmin(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        // 确保RabbitAdmin能自动处理本配置类中声明的组件
        admin.setAutoStartup(true);
        return admin;
    }
}

user-service消息监听:

package com.decotest.userservice.lsn;

import com.decotest.userservice.entity.dto.PayMessage;
import com.alibaba.fastjson.JSON;
import com.decotest.userservice.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
// User-Service 消息监听器
@Component
public class UserMessageListener {

    @Autowired
    private UserService userService;

    @RabbitListener(queues = "order.pay.queue")
    public void handlePayMessage(String message) {
        try {
            PayMessage payMessage = JSON.parseObject(message, PayMessage.class);
            userService.handlePayMessage(payMessage);
        } catch (Exception e) {
            log.error("处理支付消息失败", e);
        }
    }
}

user-service主要服务:

package com.decotest.userservice.service;

import com.alibaba.fastjson.JSON;
import com.decotest.userservice.common.ApiResponse;
import com.decotest.userservice.entity.Pay;
import com.decotest.userservice.entity.User;
import com.decotest.userservice.entity.dto.*;
import com.decotest.userservice.repository.PayRepository;
import com.decotest.userservice.repository.UserRepository;
import io.seata.core.context.RootContext;
import jakarta.transaction.Transactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import java.util.Optional;

@Slf4j
@Service
public class UserService {
    @Autowired
    private UserRepository userRepos;

    @Autowired
    private PayRepository payRepos;

    @Autowired
    private RabbitTemplate  rabbitTemplate;

    @Transactional
    public void handlePayMessage(PayMessage payMessage) {
        String order_sn = payMessage.getOrderSn();
        log.info("handlePayMessage()处理支付消息,order_sn:{}", order_sn);

        if(payRepos.existsByXid(order_sn)){
            log.info("handlePayMessage 该笔订单已支付,order_sn:{}", order_sn);
            return;
        }

        long userId = payMessage.getUserId();
        User user=userRepos.findUserById(userId);
        if(user==null){
            log.error("handlePayMessage user not found,userId:{}",userId);
            sendPayResult(order_sn, 1, "用户不存在");
            return;
        }

        BigDecimal amount = payMessage.getAmount();

        if(user.getBalance().compareTo(amount)<=0){
            log.error("handlePayMessage balance not enough,userId:{}",userId);
            sendPayResult(order_sn, 1, "余额不足");
            return;
        }

        user.setBalance(user.getBalance().subtract(amount));
        userRepos.save(user);

        Pay pay=new Pay();
        pay.setXid(order_sn);
        pay.setAmount(amount);
        pay.setStatus('1');
        pay.setUserid(userId);
        pay.setTrantype('1');
        payRepos.save(pay);
        sendPayResult(order_sn, 0, "");
    }


    /**
     * 发送支付结果
     */
    //status:0-success, 1:fail
    private void sendPayResult(String orderSn, int status, String errorMsg) {
        PayResultMessage result = new PayResultMessage();
        result.setOrderSn(orderSn);
        result.setStatus(status);
        result.setErrorMessage(errorMsg);
        result.setProcessTime(LocalDateTime.now());

        rabbitTemplate.convertAndSend(
                "pay.result.exchange",
                "pay.result",
                JSON.toJSONString(result)
        );

        log.info("发送支付结果: {} - {}", orderSn, status);
    }

}

Logo

更多推荐