TCC模式

TCC:即Try、Confirm、Cancel

一个服务的执行分成try尝试、confrim确认、cancel回退三个部分,try先占用资源,confirm确认执行,cancel执行回退。同样需要一个事务协调者来组织各个参与者的行为,保证最终一致性。

TCC模式需要实现步骤的拆分,同时由于步骤拆分需要协调者与参与者之间的互动,导致需要考虑由于网络问题导致的协调消息重发、接收延迟等问题。主要场景有:1)参与者在还没收到try消息的情况下就收到了cancel消息,如果还正常执行回滚,就会导致空回滚,将并未占用的资源“释放”回去,导致虚假的资源增加。2)就算预防了空回滚,参与者也可能在收到cancel消息后又收到try消息,如果正常执行资源占用,就会导致资源悬挂,不会回滚也不会确认执行。

幸运的是较新的SEATA版本已经支持预防空回滚和悬挂,只需要加上响应的注解。但其他如资源占用、确认、回退还是需要自己编写,整体上对业务代码的侵入非常严重,编写、测试都是大工作量。

主要改造代码如下:

微服务上的SEATA配置(application.yml),启用TCC模式,同时启用防空滚防悬挂。

seata:
    enabled: true
    application-id: order-service
    tx-service-group: default_tx_group

    client:
        rm:
            async-commit-buffer-limit: 1000 # 适当增大
            report-retry-count: 3 # 增加报告重试次数
        tm:
            commit-retry-count: 3
            rollback-retry-count: 3

    # 使用注册中心(Nacos)发现 Seata Server
    registry:
        type: nacos
        nacos:
            server-addr: 192.168.31.33:8848
            group: SEATA_GROUP
            namespace: public

    # 配置中心(使用本地配置)
    config:
        type: file

    # 服务配置
    service:
        vgroup-mapping:
            default_tx_group: default

    # 数据源代理 - 关键!
    enable-auto-data-source-proxy: true

    tcc:
        fence:
            log-table-name: tcc_fence_log  # TCC防悬挂日志表
            clean-period: 1h               # 清理周期

定义TCC接口:

package com.decotest.userservice.service.tcc; // 放在同一包下

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import io.seata.rm.tcc.api.LocalTCC;

@LocalTCC // 必须添加此注解
public interface UserTccAction { // 可以命名为UserTccAction

    @TwoPhaseBusinessAction(
            name = "userTccService", // 与Service中的name一致
            commitMethod = "commitDeduct",
            rollbackMethod = "rollbackDeduct",
            useTCCFence = true
    )
    boolean tryDeduct(@BusinessActionContextParameter(paramName = "userId") Long userId,
                      @BusinessActionContextParameter(paramName = "amount") java.math.BigDecimal amount);

    boolean commitDeduct(BusinessActionContext actionContext);

    boolean rollbackDeduct(BusinessActionContext actionContext);
}

实现TCC方法:

package com.decotest.userservice.service.tcc;

import com.decotest.userservice.common.ApiResponse;
import com.decotest.userservice.entity.Pay;
import com.decotest.userservice.entity.User;
import com.decotest.userservice.repository.PayRepository;
import com.decotest.userservice.repository.UserRepository;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

@Slf4j
@Service
public class UserTccService implements UserTccAction {
    @Autowired
    private UserRepository userRepository;

    @Autowired
    private PayRepository payRepository;

    /**
     * TCC Try阶段 - 冻结金额
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean tryDeduct(
            @BusinessActionContextParameter(paramName = "userId") Long userId,
            @BusinessActionContextParameter(paramName = "amount") BigDecimal amount) {
        // 1. 获取当前TCC事务的XID (由Seata框架管理)
        String xid = RootContext.getXID();
        if (xid == null) {
            throw new RuntimeException("不在TCC事务上下文中");
        }

        log.info("userTccService tryDeduct xid="+xid);

        try{
            User user = userRepository.findByIdForUpdate(userId).get();
            if(user.getBalance().compareTo(amount)<0){
                throw new RuntimeException("Insufficient balance");
            }
            user.setBalance(user.getBalance().subtract(amount));
            user.setFrozen_balance(user.getFrozen_balance().add(amount));
            userRepository.save(user);

            log.info("userTccService tryDeduct success xid="+xid);
            return true;
        }catch(Exception e){
            log.error("userTccService exception tryDeduct xid="+xid);
            throw e;
        }
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean commitDeduct(BusinessActionContext actionContext) {
        log.info("userTccService commitDeduct xid="+actionContext.getXid());


        try{
            BigDecimal amount = new BigDecimal(actionContext.getActionContext("amount").toString());
            long userId = Long.parseLong(actionContext.getActionContext("userId").toString());
            User user = userRepository.findByIdForUpdate(userId).get();

            if(user.getFrozen_balance().compareTo(amount)<0){
                log.error("userTccService commitDeduct Insufficient frozen balance xid="+actionContext.getXid());
                return false;
            }
            user.setFrozen_balance(user.getFrozen_balance().subtract(amount));
            userRepository.save(user);

            Pay pay = new Pay();
            pay.setAmount(amount);
            pay.setXid(actionContext.getXid());
            pay.setStatus('1');     //提交
            pay.setUserid(userId);
            pay.setTrantype('1');   //debt
            payRepository.save(pay);

            log.info("userTccService commitDeduct success xid="+actionContext.getXid());
            return true;
        }catch(Exception e){
            log.error("userTccService commitDeduct exception xid="+actionContext.getXid());
            return false;
        }
    }


    @Transactional(rollbackFor = Exception.class)
    public boolean rollbackDeduct(BusinessActionContext actionContext) {
        log.info("userTccService rollbackDeduct xid=" + actionContext.getXid());

        try{
            BigDecimal amount = new BigDecimal(actionContext.getActionContext("amount").toString());
            long userId = Long.parseLong(actionContext.getActionContext("userId").toString());
            User user = userRepository.findByIdForUpdate(userId).get();

            if(user.getFrozen_balance().compareTo(amount)>=0){
                user.setFrozen_balance(user.getFrozen_balance().subtract(amount));
                user.setBalance(user.getBalance().add(amount));
                userRepository.save(user);

                log.info("userTccService rollbackDeduct success xid=" + actionContext.getXid());
                return true;
            }else{
                log.error("userTccService frozen balance insufficient rollbackDeduct xid=" + actionContext.getXid());
                return false;
            }
        }catch(Exception e){
            log.error("userTccService exception rollbackDeduct xid=" + actionContext.getXid());
            return false;
        }
    }
}

调用处需要执行接口方法:

package com.decotest.userservice;

import com.decotest.userservice.common.ApiResponse;
import com.decotest.userservice.entity.User;
import com.decotest.userservice.service.UserService;
import com.decotest.userservice.service.tcc.UserTccAction;
import com.decotest.userservice.service.tcc.UserTccService;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@RestController
@RequestMapping("/user")
public class UserTccController {
    @Autowired
    private UserTccAction userTccAction;

    @GetMapping("/tcc/pay")
    public boolean pay(
            @RequestParam long userId,
            @RequestParam BigDecimal amount
            ){
        return userTccAction.tryDeduct(userId, amount);
    }
}

主事务调用的地方需要使用全局事务GlobalTransactional注解:

package com.decotest.orderservice;

import com.decotest.orderservice.common.ApiResponse;
import com.decotest.orderservice.service.OrderService;
import com.decotest.orderservice.service.tcc.OrderTccAction;
import com.decotest.orderservice.service.tcc.OrderTccService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/order")
public class OrderTccController {
    @Autowired
    private OrderTccAction orderTccAction;

    @GetMapping("/tcc/place")
    @GlobalTransactional
    public ApiResponse<?> placeOrder(
            @RequestParam long userId, @RequestParam long goodsId
    ){
        if(orderTccAction.tryCreateOrder(userId, goodsId)){
            return ApiResponse.success();
        }else{
            return ApiResponse.error(404, "unknow error");
        }
    }
}
Logo

更多推荐