前言

在我们开发中,会遇到某些场景下前端页面重复提交数据的问题(例如:网络延迟),还有一些就是在分布式环境下,多个进程同时请求,就会造成数据的幂等问题,所以今天想分享使用一个注解解决分布式环境下的幂等问题和防重复提交问题,最后有总结!

组件版本

  • jdk:1.8
  • springboot:2.5.4
  • mysql:8
  • redisson:3.12.0
  • mybatis-plus:3.4.3.2

主要代码分析

  • redisson的properties文件配置
j.redis.database=8
j.redis.host=10.0.16.2
j.redis.port=6378
j.redis.password=Fuzasuiredis123@wtb
  • redisson的配置映射文件
import lombok.Data;

@Data
public class RedissonConfig {
    private boolean enable;
    private String host;
    private int port = 6379;
    private String password;
    private int database = 3;
    // 连接池大小 默认 64
    private int connectionPoolSize = 64;
    // 保持最小连接数,默认 24
    private int connectionMinimumIdleSize = 24;
    // 建立连接超时时间
    private int connectTimeout = 20000;
    // 执行命令的超时时间, 从命令发送成功时开始计时
    private int timeout = 5000;
}
  • redisson的分布式锁的实现
public enum LockResultStatus {
    /**
     * 通信正常,并且加锁成功
     */
    SUCCESS,
    /**
     * 通信正常,但获取锁失败
     */
    FAILURE,
    /**
     * 通信异常和内部异常,锁状态未知
     */
    EXCEPTION;
}
import com.saman.common.redisson.enums.LockResultStatus;
import lombok.Getter;
import lombok.Setter;
import org.redisson.api.RLock;

@Setter
@Getter
public class LockResult {

    private LockResultStatus lockResultStatus;

    private RLock rLock;

    private String key;
}
import com.saman.common.base.exception.BusinessException;
import com.saman.common.base.result.SCode;
import com.saman.common.redisson.enums.LockResultStatus;
import com.saman.common.redisson.vo.LockResult;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
 * @author deke
 * @description 手动加锁
 * @date 2024/9/20
 */
@Slf4j
@NoArgsConstructor
public class LockHelper {

    /**
     * 最小锁等待时间 1s
     */
    private static final int MIN_WAIT_TIME = 1000;

    private RedissonClient redisson;

    public LockHelper(RedissonClient redisson) {
        this.redisson = redisson;
    }

    /**
     * 加锁,加锁失败抛默认异常 - 操作频繁, 请稍后再试
     *
     * @param key        加锁唯一key
     * @param expireTime 锁超时时间(毫秒)
     * @param waitTime   加锁最长等待时间(毫秒:默认1ms)
     * @return LockResult  加锁结果
     */
    public LockResult lock(String key, long expireTime, long waitTime) {
        return lock(key, expireTime, waitTime, () -> new BusinessException(SCode.COMMON_FREQUENT_OPERATION_ERROR));
    }

    /**
     * 加锁,加锁失败抛异常 - 自定义异常
     *
     * @param key        加锁唯一key
     * @param expireTime 锁超时时间(毫秒)
     * @param waitTime   加锁最长等待时间(毫秒:默认1ms)
     * @param exec       加锁失败时抛该异常,传null时加锁失败不抛异常
     * @return LockResult  加锁结果
     */
    private LockResult lock(String key, long expireTime, long waitTime, Supplier<BusinessException> exec) {
        if (waitTime < MIN_WAIT_TIME) {
            waitTime = MIN_WAIT_TIME;
        }

        LockResult result = new LockResult();
        try {
            RLock rLock = redisson.getLock(key);
            try {
                if (rLock.tryLock(waitTime, expireTime, TimeUnit.MILLISECONDS)) {
                    result.setLockResultStatus(LockResultStatus.SUCCESS);
                    result.setRLock(rLock);
                    result.setKey(key);
                } else {
                    result.setLockResultStatus(LockResultStatus.FAILURE);
                }
            } catch (InterruptedException e) {
                log.error("Redis 获取分布式锁失败, key: {}, e: {}", key, e.getMessage());
                result.setLockResultStatus(LockResultStatus.EXCEPTION);
                rLock.unlock();
            }
        } catch (Exception e) {
            log.error("Redis 获取分布式锁失败, key: {}, e: {}", key, e.getMessage());
            result.setLockResultStatus(LockResultStatus.EXCEPTION);
        }

        if (exec != null && LockResultStatus.FAILURE.equals(result.getLockResultStatus())) {
            log.warn("Redis 加锁失败, key: {}, message:{}", key, exec.get().getMessage());
            throw new BusinessException(SCode.REPEAT_FAST_SUBMIT);
        }

        log.info("Redis 加锁结果:{}, key: {}", result.getLockResultStatus(), key);

        return result;
    }

    /**
     * 解锁
     */
    public void unlock(LockResult lockResult) {
        if (lockResult == null || lockResult.getRLock() == null) {
            log.warn("Redis 解锁失败, lockResult 或 RLock 为空");
            return;
        }

        RLock rLock = lockResult.getRLock();
        try {
            if (rLock.isHeldByCurrentThread()) {
                rLock.unlock();
                log.info("Redis 解锁成功, key: {}", lockResult.getKey());
            } else {
                log.warn("Redis 解锁失败, 当前线程未持有锁, key: {}", lockResult.getKey());
            }
        } catch (IllegalMonitorStateException e) {
            log.error("Redis 解锁失败, key: {}, e: {}", lockResult.getKey(), e.getMessage());
        }
    }
}
  • redisson客户端操作指令

import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.*;
import org.redisson.client.codec.StringCodec;

import java.util.concurrent.TimeUnit;

/**
 * @author deke
 * @description 手动加锁
 * @date 2024/9/20
 */
@Slf4j
@NoArgsConstructor
public class RedissonHelper {

//    private static final SerializationCodec s = new SerializationCodec();
    /**
     * 默认缓存时间2毫秒
     */
    private static final Long DEFAULT_EXPIRED = 2000L;

    private RedissonClient redissonClient;

    public RedissonHelper(RedissonClient redisson) {
        this.redissonClient = redisson;
    }

    public <T> T exec(String key, RedissonExec<T> exec) {
        RBucket<T> bucket = redissonClient.getBucket(key, StringCodec.INSTANCE);
        return exec.run(bucket);
    }

    public void call(String key, RedissonCall call) {
        RBucket<Object> bucket = redissonClient.getBucket(key, StringCodec.INSTANCE);
        call.run(bucket);
    }

    @FunctionalInterface
    public interface RedissonCall {
        void run(RBucket<Object> bucket);
    }

    @FunctionalInterface
    public interface RedissonExec<T> {
        T run(RBucket<T> bucket);
    }

    /**
     * 读取缓存中的字符串,永久有效
     *
     * @param key 缓存key
     * @return 字符串
     */
    public String get(String key) {
        RBucket<String> bucket = redissonClient.getBucket(key, StringCodec.INSTANCE);
        return bucket.get();
    }

    /**
     * 缓存字符串
     *
     * @param key
     * @param value
     */
    public void set(String key, String value) {
        RBucket<String> bucket = redissonClient.getBucket(key, StringCodec.INSTANCE);
        bucket.set(value);
    }

    /**
     * 缓存带过期时间的字符串
     *
     * @param key     缓存key
     * @param value   缓存值
     * @param expired 缓存过期时间,long类型,必须传值
     */
    public void setEx(String key, long expired, String value) {
        RBucket<String> bucket = redissonClient.getBucket(key, StringCodec.INSTANCE);
        bucket.set(value, expired <= 0L ? DEFAULT_EXPIRED : expired, TimeUnit.MILLISECONDS);
    }

    /**
     * 判断缓存是否存在
     *
     * @param key
     * @return true 存在
     */
    public Boolean isExists(String key) {
        return redissonClient.getBucket(key, StringCodec.INSTANCE).isExists();
    }

    /**
     * 移除缓存
     *
     * @param key
     */
    public void delete(String key) {
        redissonClient.getBucket(key, StringCodec.INSTANCE).delete();
    }

    /**
     * 获取RList对象
     *
     * @param key RList的key
     * @return RList对象
     */
    public <T> RList<T> getList(String key) {
        return redissonClient.getList(key, StringCodec.INSTANCE);
    }

    /**
     * 获取RMapCache对象
     *
     * @param key
     * @return RMapCache对象
     */
    public <K, V> RMapCache<K, V> getMap(String key) {
        return redissonClient.getMapCache(key, StringCodec.INSTANCE);
    }

    /**
     * 获取RSET对象
     *
     * @param key
     * @return RSET对象
     */
    public <T> RSet<T> getSet(String key) {
        return redissonClient.getSet(key, StringCodec.INSTANCE);
    }

    /**
     * 获取RScoredSortedSet对象
     *
     * @param key
     * @param <T>
     * @return RScoredSortedSet对象
     */
    public <T> RScoredSortedSet<T> getScoredSortedSet(String key) {
        return redissonClient.getScoredSortedSet(key, StringCodec.INSTANCE);
    }
}
  • redisson配置加载、@Bean 的注册
import com.saman.common.redis.RedissonConfig;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import java.util.Optional;

@Slf4j
@Configuration
@ConditionalOnProperty(value = "j.redis.enable", havingValue = "true", matchIfMissing = true)
public class RedissonAutoConfig implements DisposableBean {

    private RedissonClient redissonClient;

    @Bean
    @ConfigurationProperties(prefix = "j.redis")
    RedissonConfig redissonConfig() {
        return new RedissonConfig();
    }

    @Bean(destroyMethod = "shutdown")
    RedissonClient redissonClient(@Qualifier("redissonConfig") RedissonConfig rConfig) {
        Config config = new Config();
        config.useSingleServer().setAddress(String.format("redis://%s:%s", rConfig.getHost(), rConfig.getPort()))
                .setPassword(rConfig.getPassword())
                .setDatabase(rConfig.getDatabase())
                .setConnectionPoolSize(rConfig.getConnectionPoolSize())
                .setConnectionMinimumIdleSize(rConfig.getConnectionMinimumIdleSize())
                .setConnectTimeout(rConfig.getConnectTimeout())
                .setTimeout(rConfig.getTimeout());

        return Redisson.create(config);
    }
    
    @Lazy
    @Bean
    public LockHelper lockHelper(@Qualifier("redissonClient") RedissonClient redissonClient) {
        return new LockHelper(redissonClient);
    }

    @Lazy
    @Bean
    public RedissonHelper redissonHelper(@Qualifier("redissonClient") RedissonClient redissonClient) {
        return new RedissonHelper(redissonClient);
    }

    @Override
    public void destroy() throws Exception {
        if (this.redissonClient != null) {
            log.info("Shutting down Redisson client...");
            this.redissonClient.shutdown();
            log.info("Redisson client has been shut down.");
        }
    }
}
  • 注解代码

import java.lang.annotation.*;

/**
 * @author deke
 * @description 可以分组 判断 为空则返回报错信息
 * @date 2022/1/15
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReSubmit {

    /**
     * @description 根据参数数组作为防重复提交的唯一标识
     * @author deke
     * @date 2023/1/15
     */
    String[] keys() default {};

    /**
     * @description 数据失效时间(毫秒 : 默认2ms)
     * @author deke
     * @date 2023/1/15
     */
    long invalidMilliseconds() default 1000L;

    /**
     * @description 分布式锁-获得锁后超时时间(毫秒:默认2ms)
     */
    long expireTime() default 1000L;

    /**
     * @description 分布式锁-获取锁最长等待时间(毫秒 : 默认1ms)
     */
    long waitTime() default 1000L;

    /**
     * @description 默认 超过1次重复提交则 触发限流,-1:代表没有限制
     * @author deke
     * @date 2023/1/15
     */
    int defaultLimitNum() default 1;    
}

注解参数:

  • keys:根据参数数组作为防重复提交的唯一标识
  • invalidMilliseconds:数据失效时间(毫秒 : 默认2ms)
  • expireTime:分布式锁-获得锁后超时时间(毫秒:默认2ms)
  • waitTime:分布式锁-获取锁最长等待时间(毫秒 : 默认1ms)
  • defaultLimitNum:默认 超过1次重复提交则 触发限流,-1:代表没有限制
  • 主要切面代码
import cn.hutool.json.JSONUtil;
import com.saman.common.annotation.ReSubmit;
import com.saman.common.base.exception.BusinessException;
import com.saman.common.base.result.RestBody;
import com.saman.common.base.result.SCode;
import com.saman.common.base.utils.DataUtils;
import com.saman.common.base.utils.MD5Util;
import com.saman.common.base.utils.ReflectionUtils;
import com.saman.common.redisson.LockHelper;
import com.saman.common.redisson.RedissonHelper;
import com.saman.common.redisson.enums.LockResultStatus;
import com.saman.common.redisson.vo.LockResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RBucket;
import org.springframework.context.annotation.Lazy;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

@Slf4j
@Aspect
public class ReSubmitAdvice {
    private final static String KEY_PREFIX = "REPEAT_SUBMIT:";
    private final static String RES_KEY_PREFIX = "REPEAT_SUBMIT:RES";
    private final static String REDS_KEY_PREFIX = "REPEAT_SUBMIT:REDS";

    @Lazy
    @Resource
    private RedissonHelper redissonHelper;

    @Lazy
    @Resource
    private LockHelper lockHelper;


    @Pointcut("@annotation(com.saman.common.annotation.ReSubmit)")
    private void entityPointcut() {
    }

    @Around("entityPointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method method = methodSignature.getMethod();
        String key = method.getDeclaringClass().getName() + method.getName();

        ReSubmit anno = method.getAnnotation(ReSubmit.class);
        String[] params = anno.keys();
        long invalidTime = anno.invalidMilliseconds();
        long expireTime = anno.expireTime();
        long waitTime = anno.waitTime();
        int limit = anno.defaultLimitNum();
       
        String[] parameterNames = methodSignature.getParameterNames();
        Object[] args = joinPoint.getArgs();

        Class<?>[] parameterTypes = method.getParameterTypes();
        boolean repeatFlag = false;
        int allArgNone = 0;
        for (int i = 0, j = parameterNames.length; i < j; i++) {
            String parameterName = parameterNames[i];
            if (StringUtils.isBlank(parameterName)) {
                continue;
            }
            Class<?> parameterType = parameterTypes[i];
            Object arg = args[i];
            if (parameterType.equals(HttpServletResponse.class)
                    || parameterType.equals(HttpServletRequest.class)
                    || parameterType.equals(File.class)) {
                continue;
            }
            //TODO: 暂不处理list、map入参
            if (DataUtils.isList(arg)) {
                continue;
            } else if (DataUtils.isMap(arg)) {
                continue;
            } else if (DataUtils.isBaseDefaultType(parameterType)) {
                //如果没有指定特殊的 缓存字段的话,则应该缓存所有的入参
                key = assembleWithParamSingle(arg, parameterName, method, key);
                if ((Objects.nonNull(arg) && "" != arg)) {
                    allArgNone += 1;
                }
                repeatFlag = true;
            } else if (DataUtils.nonBaseDefaultType(parameterType)) {
                key = assembleWithParamBody(arg, params, key);
                repeatFlag = true;
            }
        }
        if (params.length < 1 && Objects.equals(0, allArgNone)) {
            throw new BusinessException(SCode.REPEAT_SUBMIT_NO_PARAM_ERROR);
        }

        Object proceed = null;

        String resKey = RES_KEY_PREFIX + MD5Util.MD5(key);
        String rdKey = KEY_PREFIX + MD5Util.MD5(key);
        String redsKey = REDS_KEY_PREFIX + MD5Util.MD5(key);

        // 利用redisson 实现分布式锁,避免第一次的并发请求
        LockResult lock = lockHelper.lock(redsKey, expireTime, waitTime);
        if (lock.getLockResultStatus().equals(LockResultStatus.SUCCESS)) {
            try {
                if (repeatFlag) {
                    redissonHelper.call(rdKey, t -> {
                        String val = (String) t.get();
                        if (StringUtils.isBlank(val)) {
                            t.set("1", invalidTime, TimeUnit.MILLISECONDS);
                            return;
                        }
                        int count = Integer.parseInt(val);
                        if (limit > 0 && count >= limit) {
                            throw new BusinessException(SCode.REPEAT_SUBMIT);
                        }
                        t.getAndSet(String.valueOf(count + 1), invalidTime, TimeUnit.MILLISECONDS);
                    });
                }
                String e = redissonHelper.exec(resKey, RBucket::get);
                if (StringUtils.isNotBlank(e)) {
                    log.info("<<ReSubmitAdvice>>-->从缓存中加载数据");
                    return JSONUtil.toBean(e, RestBody.class);
                }

                proceed = joinPoint.proceed();
                if (Objects.nonNull(proceed) && StringUtils.isNotBlank(resKey)) {
                    Object finalProceed = proceed;
                    redissonHelper.call(resKey, t ->
                            t.getAndSet(JSONUtil.toJsonStr(finalProceed), invalidTime, TimeUnit.MILLISECONDS));
                }
            } finally {
                lockHelper.unlock(lock);
            }
        } else if (lock.getLockResultStatus() == LockResultStatus.FAILURE) {
            log.error("Thread:{} ,failed to acquire the lock...", Thread.currentThread().getName());
            throw new BusinessException(SCode.REPEAT_FAST_SUBMIT);
        } else {
            log.error("Thread:{}, encountered an exception while trying to acquire the lock...",
                    Thread.currentThread().getName());
            throw new BusinessException(SCode.REPEAT_SOURCE_SUBMIT);
        }
        return proceed;
    }

    public String assembleWithParamBody(Object arg, String[] params, String key) {
        Class<?> aClass = arg.getClass();
        Field[] fds = aClass.getDeclaredFields();

        StringBuilder keyBuilder = new StringBuilder(key);
        int allFiledNone = 0;
        for (String param : params) {
            String[] split = param.split("\\.");
            if (split.length < 1) {
                continue;
            }
            if (split.length > 1) {
                split = DataUtils.removeFromHeadElement(split, 1);
            }

            Object filedVal = ReflectionUtils.getFiledVal(fds, split, arg);
            if (Objects.isNull(filedVal) || filedVal == "") {
                allFiledNone += 1;
                continue;
            }
            keyBuilder.append(filedVal);
        }
        if (Objects.equals(allFiledNone, params.length)) {
            throw new BusinessException(SCode.REPEAT_SUBMIT_NO_PARAM_ERROR);
        }
        return keyBuilder.toString();
    }

    public String assembleWithParamSingle(Object arg, String parameterName, Method method, String key) {
        Parameter[] parameters = method.getParameters();
        StringBuilder keyBuilder = new StringBuilder(key);
        for (Parameter parameter : parameters) {
            log.info("<<Parameter的name>>:{}", parameter.getName());
            if (!parameterName.equals(parameter.getName())) {
                continue;
            }
            if ((Objects.nonNull(arg) && "" != arg)) {
                keyBuilder.append(arg);
            }
        }
        return keyBuilder.toString();
    }
}

开始使用

import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.saman.common.annotation.ReSubmit;
import com.saman.common.base.result.RestBody;
import com.saman.common.base.utils.DataGatherFactory;
import com.sb.simple.domian.dao.UserMapper;
import com.sb.simple.domian.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
    @Resource
    private UserMapper userMapper;

    @ReSubmit(keys = {"userId"}, invalidMilliseconds = 7000L, defaultLimitNum = 2)
    @GetMapping(value = "/getUserInfo")
    public RestBody<User> getUserInfo(@RequestParam(value = "userId") String userId) {
        UpdateWrapper<User> updateWrapper = new UpdateWrapper<>();
        updateWrapper.eq("user_id", userId)
                .setSql("age = age + 1");
        userMapper.update(null, updateWrapper);

        return RestBody.success(userMapper.getListByAttr(DataGatherFactory.getHashMap("userId", userId)).get(0));
    }

    @ReSubmit(keys = {"u.userId"}, invalidMilliseconds = 10000L, expireTime = 12000L, waitTime = 13000L, defaultLimitNum = 2)
    @PostMapping(value = "/info")
    public RestBody<User> info(@RequestBody User u) {
        UpdateWrapper<User> updateWrapper = new UpdateWrapper<>();
        updateWrapper.eq("user_id", u.getUserId())
                .setSql("age = age + 1");
        userMapper.update(null, updateWrapper);

        return RestBody.success(userMapper.getListByAttr(DataGatherFactory.getHashMap("userId", u.getUserId())).get(0));
    }
}

有2种使用的方式:
基于单个确定唯一的值
基于对象中某个确定唯一的值

jemeter 测试

  • 我们可以看到配置拦截 defaultLimitNum = 2 是2次,那么在方法 getUserInfo 内先进行一下更新年龄、再查询对象数据,正常2次请求过来不对的情况年龄应该是加2的。
    在这里插入图片描述
    在这里插入图片描述

jemeter模拟1s内 有5个请求。可以看到结果如下
在这里插入图片描述
符合我们上面的预期,只能通过2个请求。那我们在看看数据库的数据
在这里插入图片描述
age=13,也达到了幂等的要求。

总结

本次分享主要解决的问题是:幂等+重复提交

  • 利用了redisson的分布式锁实现对共享资源的请求加锁,
  • 格外扩展 defaultLimitNum ,达到一个方法可以被有限次数的请求,并在时间内返回相同的值。
Logo

更多推荐