Redis分布式锁和Transactional共同实现微服务下余额的数据一致性
本文介绍了一种基于分布式锁和事务机制的用户余额维护方案。系统采用Redis分布式锁确保同一用户操作的串行化,通过@Transactional注解保证事务一致性。针对用户多种余额类型(如币余额、可提现金额、冻结金额等),设计了原子性更新方法,使用MyBatis的LambdaUpdateWrapper实现字段级别的增减操作,并自动校验余额充足性。方案还支持维护历史数据字段(如累计提现金额),避免了扫描
·
背景 :因为TOC业务和微服务的的使用场景下实现对用户多种余额以及多个历史性数据的维护,保证数据一致性
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean updateUserAmount(TransactionBo bo) {
// 参数校验
if (Objects.isNull(bo.getUserId()) || Objects.isNull(bo.getAmount()) || Objects.isNull(bo.getAdd())) {
throw new TBServiceException(ErrorCodeEnum.USER_PARAM_MISSING);
}
if (bo.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new TBServiceException(ErrorCodeEnum.PARAM_NOT_POSITIVE);
}
// 构建分布式锁的key,基于用户ID确保同一用户的操作串行化
String lockKey = USER_AMOUNT_LOCK + bo.getUserId();
boolean tryLock = false;
try {
// 尝试获取分布式锁,设置合理的过期时间防止死锁
tryLock = redisLock.tryLock(lockKey, MIN_10);
if (!tryLock) {
log.warn("获取用户金额操作锁失败,用户ID: {}, 锁Key: {}", bo.getUserId(), lockKey);
throw new TBServiceException(ErrorCodeEnum.USER_OPERATION_BUSY);
}
//根据类型变更用户具体金额
return getSwitchTypeUpdateUserAccount(bo);
} finally {
// 确保释放锁
if (tryLock) {
try {
redisLock.unlock(lockKey);
} catch (Exception e) {
log.error("释放用户金额操作锁失败,用户ID: {}, 锁Key: {}", bo.getUserId(), lockKey, e);
// 这里只记录错误,不抛出异常,避免影响主业务逻辑
}
}
}
}
@Data
@Builder
public class TransactionBo {
/**
* 用户ID
*/
private Long userId;
/**
* 变动金额
*/
private BigDecimal amount;
/**
* 是否新增 true : 新增 false : 减少
*/
private Boolean add;
/**
* 关联订单号
*/
private String orderId;
/**
* 交易类型
* @see org.dromara.common.core.enums.TradeType
*/
private Integer transactionType;
/**
* 操作类型
* @see org.dromara.common.core.enums.OperationType
*/
private Integer operationType;
}
private Boolean getSwitchTypeUpdateUserAccount(TransactionBo bo) {
// 获取用户信息
SysUser user = userService.getById(bo.getUserId());
if (Objects.isNull(user)) {
throw new TBServiceException(ErrorCodeEnum.USER_NOT_EXIST);
}
// 根据操作类型执行不同的逻辑
if (Objects.equals(OperationType.FUTURE_COIN.getCode(), bo.getOperationType())) {
//处理币操作
return handleFutureCoinOperation(bo, user);
} else if (Objects.equals(OperationType.WITHDRAWABLE_AMOUNT.getCode(), bo.getOperationType())) {
//处理可提现金额操作
return handleWithdrawableAmountOperation(bo, user);
} else if (Objects.equals(OperationType.FREEZE_BALANCE.getCode(), bo.getOperationType())) {
//处理冻结余额变动
return handleFreezeBalanceOperation(bo, user);
} else if (Objects.equals(OperationType.UNFREEZE_TO_AVAILABLE.getCode(), bo.getOperationType())) {
//处理解冻余额到可提现(解冻流水号)
return handleUnfreezeToAvailableOperation(bo, user);
} else if (Objects.equals(OperationType.AVAILABLE_TO_FUTURE_COIN.getCode(), bo.getOperationType())) {
// 处理可提现余额转币
return handleAvailableToFutureCoinOperation(bo, user);
}else if (Objects.equals(OperationType.FUTURE_COIN_TO_AVAILABLE.getCode(), bo.getOperationType())) {
//处理币转可提现余额 - 新增
return handleFutureCoinToAvailableOperation(bo, user);
}else {
throw new TBServiceException(ErrorCodeEnum.ORDER_OPERATION_TYPE_NOT_SUPPORTED);
}
}
/**
* 交易类型枚举
*/
@Getter
@AllArgsConstructor
public enum OperationType {
/**
* 对币操作
*/
FUTURE_COIN(1, "对币操作"),
/**
* 对可提现金额操作
*/
WITHDRAWABLE_AMOUNT(2, "对可提现金额操作"),
/**
* 对冻结余额操作
*/
FREEZE_BALANCE(3, "对冻结余额操作"),
/**
* 解冻余额到可提现
*/
UNFREEZE_TO_AVAILABLE(4, "解冻余额到可提现金额"),
/**
* 可提现余额转币
*/
AVAILABLE_TO_FUTURE_COIN(5, "可提现余额转币"),
/**
* 币转可提现余额
*/
FUTURE_COIN_TO_AVAILABLE(6, "币转可提现余额");
private final Integer code;
private final String info;
}
下面举其中一个方法的逻辑 : 其余方法相当是一种拓展
private Boolean handleFutureCoinOperation(TransactionBo bo, SysUser user) {
// 原子性更新
int updateCount = bo.getAdd() ?
baseMapper.incrementAmount(bo.getUserId(), bo.getAmount()) :
baseMapper.decrementAmount(bo.getUserId(), bo.getAmount());
// 错误处理
if (updateCount <= 0) {
handleUpdateFailure(bo);
}
// 记录账户日志
createAccountLog(bo, user.getAmount(),
bo.getAdd() ? user.getAmount().add(bo.getAmount()) : user.getAmount().subtract(bo.getAmount()));
return Boolean.TRUE;
}
/**
* 处理更新失败的情况
*/
private void handleUpdateFailure(TransactionBo bo) {
Long exists = baseMapper.checkUserExists(bo.getUserId());
if (Objects.isNull(exists) || exists == 0L) {
throw new TBServiceException(ErrorCodeEnum.USER_NOT_EXIST);
} else {
throw new TBServiceException(ErrorCodeEnum.USER_BALANCE_NOT_ENOUGH);
}
}
这里是Mapper层的方法
default int incrementAmount(Long userId, BigDecimal amount){
return update(null, new LambdaUpdateWrapper<SysUser>()
.eq(SysUser::getUserId, userId)
.setSql("amount = amount + " + amount));
}
default int decrementAmount(Long userId, BigDecimal amount){
return update(null, new LambdaUpdateWrapper<SysUser>()
.eq(SysUser::getUserId, userId)
.ge(SysUser::getAmount, amount) // 关键:确保余额充足
.setSql("amount = amount - " + amount));
}
1.在RedisLock的作用下,确保同一时间,只有同一线程操作同一用户,保证同一性
2.在@Transactional注解的作用下,抛出系统异常或者业务异常会启用事务机制回滚数据,保证一致性
上面举的例子只是维护了用户的单一字段,如果维护多个字段,可以参考下面例子
1.维护用户可提现金额的字段
2.维护用户历史提现金额的字段
/**
* 处理可提现金额操作
*/
private Boolean handleWithdrawableAmountOperation(TransactionBo bo, SysUser user) {
// 原子性更新
int updateCount = bo.getAdd() ?
baseMapper.incrementAvailableAmount(bo.getUserId(), bo.getAmount()) :
baseMapper.decrementAvailableAmount(bo.getUserId(), bo.getAmount());
// 错误处理
if (updateCount == 0) {
handleUpdateFailure(bo);
}
//维护历史提现
if (!bo.getAdd()){
updateCount = baseMapper.incrementWithdrawnAmount(bo.getUserId(), bo.getAmount());
}
// 错误处理
if (updateCount == 0) {
handleUpdateFailure(bo);
}
// 记录账户日志
createAccountLog(bo, user.getAvailableAmount(),
bo.getAdd() ? user.getAvailableAmount().add(bo.getAmount()) : user.getAvailableAmount().subtract(bo.getAmount()));
return Boolean.TRUE;
}
Mapper层方法
default int incrementAvailableAmount(Long userId, BigDecimal amount){
return update(null, new LambdaUpdateWrapper<SysUser>()
.eq(SysUser::getUserId, userId)
.setSql("available_amount = available_amount + " + amount));
}
default int decrementAvailableAmount(Long userId, BigDecimal amount){
return update(null, new LambdaUpdateWrapper<SysUser>()
.eq(SysUser::getUserId, userId)
.ge(SysUser::getAvailableAmount, amount) // 关键:确保余额充足
.setSql("available_amount = available_amount - " + amount));
}
// Mapper接口方法
default int incrementWithdrawnAmount(Long userId, BigDecimal amount) {
return update(null, new LambdaUpdateWrapper<SysUser>()
.eq(SysUser::getUserId, userId)
.setSql("withdrawn_amount = withdrawn_amount + " + amount));
}
这样避免去扫描账单/订单/等大表计算逻辑,也等同于实时查询,维护单一字段,避免selcetCount()操作,减少数据库压力
更多推荐

所有评论(0)