分布式事务入门2——使用seata的TCC模式实现分布式事务
TCC(Try-Confirm-Cancel)是一种分布式事务解决方案,通过将业务操作拆分为try(资源预留)、confirm(确认执行)和cancel(回退)三个阶段实现最终一致性。该模式需要事务协调者管理参与者行为,并处理网络问题导致的消息重发、延迟等问题,可能引发空回滚和资源悬挂问题。较新的SEATA框架已支持空回滚和悬挂防护,但仍需开发者自行实现资源占用、确认和回退逻辑,对业务代码侵入较大
TCC模式
TCC:即Try、Confirm、Cancel
一个服务的执行分成try尝试、confrim确认、cancel回退三个部分,try先占用资源,confirm确认执行,cancel执行回退。同样需要一个事务协调者来组织各个参与者的行为,保证最终一致性。
TCC模式需要实现步骤的拆分,同时由于步骤拆分需要协调者与参与者之间的互动,导致需要考虑由于网络问题导致的协调消息重发、接收延迟等问题。主要场景有:1)参与者在还没收到try消息的情况下就收到了cancel消息,如果还正常执行回滚,就会导致空回滚,将并未占用的资源“释放”回去,导致虚假的资源增加。2)就算预防了空回滚,参与者也可能在收到cancel消息后又收到try消息,如果正常执行资源占用,就会导致资源悬挂,不会回滚也不会确认执行。
幸运的是较新的SEATA版本已经支持预防空回滚和悬挂,只需要加上响应的注解。但其他如资源占用、确认、回退还是需要自己编写,整体上对业务代码的侵入非常严重,编写、测试都是大工作量。
主要改造代码如下:
微服务上的SEATA配置(application.yml),启用TCC模式,同时启用防空滚防悬挂。
seata:
enabled: true
application-id: order-service
tx-service-group: default_tx_group
client:
rm:
async-commit-buffer-limit: 1000 # 适当增大
report-retry-count: 3 # 增加报告重试次数
tm:
commit-retry-count: 3
rollback-retry-count: 3
# 使用注册中心(Nacos)发现 Seata Server
registry:
type: nacos
nacos:
server-addr: 192.168.31.33:8848
group: SEATA_GROUP
namespace: public
# 配置中心(使用本地配置)
config:
type: file
# 服务配置
service:
vgroup-mapping:
default_tx_group: default
# 数据源代理 - 关键!
enable-auto-data-source-proxy: true
tcc:
fence:
log-table-name: tcc_fence_log # TCC防悬挂日志表
clean-period: 1h # 清理周期
定义TCC接口:
package com.decotest.userservice.service.tcc; // 放在同一包下
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import io.seata.rm.tcc.api.LocalTCC;
@LocalTCC // 必须添加此注解
public interface UserTccAction { // 可以命名为UserTccAction
@TwoPhaseBusinessAction(
name = "userTccService", // 与Service中的name一致
commitMethod = "commitDeduct",
rollbackMethod = "rollbackDeduct",
useTCCFence = true
)
boolean tryDeduct(@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "amount") java.math.BigDecimal amount);
boolean commitDeduct(BusinessActionContext actionContext);
boolean rollbackDeduct(BusinessActionContext actionContext);
}
实现TCC方法:
package com.decotest.userservice.service.tcc;
import com.decotest.userservice.common.ApiResponse;
import com.decotest.userservice.entity.Pay;
import com.decotest.userservice.entity.User;
import com.decotest.userservice.repository.PayRepository;
import com.decotest.userservice.repository.UserRepository;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Slf4j
@Service
public class UserTccService implements UserTccAction {
@Autowired
private UserRepository userRepository;
@Autowired
private PayRepository payRepository;
/**
* TCC Try阶段 - 冻结金额
*/
@Transactional(rollbackFor = Exception.class)
public boolean tryDeduct(
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "amount") BigDecimal amount) {
// 1. 获取当前TCC事务的XID (由Seata框架管理)
String xid = RootContext.getXID();
if (xid == null) {
throw new RuntimeException("不在TCC事务上下文中");
}
log.info("userTccService tryDeduct xid="+xid);
try{
User user = userRepository.findByIdForUpdate(userId).get();
if(user.getBalance().compareTo(amount)<0){
throw new RuntimeException("Insufficient balance");
}
user.setBalance(user.getBalance().subtract(amount));
user.setFrozen_balance(user.getFrozen_balance().add(amount));
userRepository.save(user);
log.info("userTccService tryDeduct success xid="+xid);
return true;
}catch(Exception e){
log.error("userTccService exception tryDeduct xid="+xid);
throw e;
}
}
@Transactional(rollbackFor = Exception.class)
public boolean commitDeduct(BusinessActionContext actionContext) {
log.info("userTccService commitDeduct xid="+actionContext.getXid());
try{
BigDecimal amount = new BigDecimal(actionContext.getActionContext("amount").toString());
long userId = Long.parseLong(actionContext.getActionContext("userId").toString());
User user = userRepository.findByIdForUpdate(userId).get();
if(user.getFrozen_balance().compareTo(amount)<0){
log.error("userTccService commitDeduct Insufficient frozen balance xid="+actionContext.getXid());
return false;
}
user.setFrozen_balance(user.getFrozen_balance().subtract(amount));
userRepository.save(user);
Pay pay = new Pay();
pay.setAmount(amount);
pay.setXid(actionContext.getXid());
pay.setStatus('1'); //提交
pay.setUserid(userId);
pay.setTrantype('1'); //debt
payRepository.save(pay);
log.info("userTccService commitDeduct success xid="+actionContext.getXid());
return true;
}catch(Exception e){
log.error("userTccService commitDeduct exception xid="+actionContext.getXid());
return false;
}
}
@Transactional(rollbackFor = Exception.class)
public boolean rollbackDeduct(BusinessActionContext actionContext) {
log.info("userTccService rollbackDeduct xid=" + actionContext.getXid());
try{
BigDecimal amount = new BigDecimal(actionContext.getActionContext("amount").toString());
long userId = Long.parseLong(actionContext.getActionContext("userId").toString());
User user = userRepository.findByIdForUpdate(userId).get();
if(user.getFrozen_balance().compareTo(amount)>=0){
user.setFrozen_balance(user.getFrozen_balance().subtract(amount));
user.setBalance(user.getBalance().add(amount));
userRepository.save(user);
log.info("userTccService rollbackDeduct success xid=" + actionContext.getXid());
return true;
}else{
log.error("userTccService frozen balance insufficient rollbackDeduct xid=" + actionContext.getXid());
return false;
}
}catch(Exception e){
log.error("userTccService exception rollbackDeduct xid=" + actionContext.getXid());
return false;
}
}
}
调用处需要执行接口方法:
package com.decotest.userservice;
import com.decotest.userservice.common.ApiResponse;
import com.decotest.userservice.entity.User;
import com.decotest.userservice.service.UserService;
import com.decotest.userservice.service.tcc.UserTccAction;
import com.decotest.userservice.service.tcc.UserTccService;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@RestController
@RequestMapping("/user")
public class UserTccController {
@Autowired
private UserTccAction userTccAction;
@GetMapping("/tcc/pay")
public boolean pay(
@RequestParam long userId,
@RequestParam BigDecimal amount
){
return userTccAction.tryDeduct(userId, amount);
}
}
主事务调用的地方需要使用全局事务GlobalTransactional注解:
package com.decotest.orderservice;
import com.decotest.orderservice.common.ApiResponse;
import com.decotest.orderservice.service.OrderService;
import com.decotest.orderservice.service.tcc.OrderTccAction;
import com.decotest.orderservice.service.tcc.OrderTccService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/order")
public class OrderTccController {
@Autowired
private OrderTccAction orderTccAction;
@GetMapping("/tcc/place")
@GlobalTransactional
public ApiResponse<?> placeOrder(
@RequestParam long userId, @RequestParam long goodsId
){
if(orderTccAction.tryCreateOrder(userId, goodsId)){
return ApiResponse.success();
}else{
return ApiResponse.error(404, "unknow error");
}
}
}
更多推荐

所有评论(0)