分布式事务产生的背景

1.分布式事务产生的背景
2.分布式事务Base和CAP理论
3.分布式事务解决方案有哪些
4.单体项目如何基于jta+ Atomikos
5.哪些场景下不建议使用seata
6.如何基于MQ解决分布式事务
7.Seata解决分布式事务的原理
7.RocketMQ解决分布式事务
8.跨语言的方式如何解决分布式事务问题
9.TCC 解决分布式事务原理  

概念

分情况来定。

  1. 在单体的项目中,多个不同业务逻辑都是在同一个数据源中实现事务管理,是不存在分布式事务的问题,因为同一数据源的情况下都是采用事务管理器,相当于每个事务管理器对应一个数据源。

img

  1. 在单体的项目中,有多个不同的数据源,每个数据源中都有自己独立的事务管理器,互不影响,那么这时候也会存在多数据源事务管理分布式事务问题:解决方案jta+ Atomikos

img

3.在分布式/微服务架构中,每个服务都有自己的本地的事务,每个服务本地事务互不影响,那么这时候也会存在分布式事务的问题。

在分布式/微服务架构中,订单服务调用派单服务接口成功之后,可能会引发错误。2PC3PC思想实际上都是解决我们在分布式系统中,每个节点保证数据一致性问题。

img

事务的定义

支付服务调用完积分服务接口之后,后续
支付服务发生了报错,只会回滚支付服务事务,不会回滚积分服务事务。
最终导致数据就一致。

对我们的业务逻辑可以实现提交或者回滚,保证数据的一致性的情况。

所以要么提交,要么回滚。

原子性a 要么提交 要么回滚

一致性c

隔离性i 多个事务在一起执行的时候,互不影响;

持久性d 事务一旦提交或者回滚后,不会在对该结果有任何影响

传播行为与事务隔离级别

传播行为----spring 方法之间方法

分布式事务理论

Base与CAP理论

CAP****理论

1 CAP定律和BASE理论

1.1 CAP定律#

这个定理的内容是指的是在一个分布式系统中、Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。

(一)一致性(C)

在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)

(二)可用性(A)

在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)

(三)分区容错性(P) 形成脑裂问题

以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

https://baike.baidu.com/item/%E5%AE%B9%E9%94%99%E7%8E%87/9967698?fr=aladdin

(四)总结一下

以上可以知道分区容错性(P)主要代表网络波动产生的错误,这是不可避免的,且这个三个模式不可兼得,所以目前就只有两种模式:CP和AP模式。

其中CP表示遵循一致性原则,但不能保证高可用性,其中zookeeper作为注册中心就是采用CP模式,因为zookeeper有过半节点不可以用的话整个zookeeper将不可用。

AP表示遵循于可用性原则,例如Eureka作为注册中心用的是AP模式,因为其为去中心化,采用你中有我我中有你的相互注册方式,只要集群中有一个节点可以使用,整个eureka服务就是可用的,但可能会出现短暂的数据不一致问题。

AP保证可用性:但是不能保证每个副本数据数据一致性,但是可以保证可用性;

CP保证数据一致性:如果有过半的zk节点宕机的情况下,不能保证可用性,但是必须保证每个副本节点之间数据一致性, 比如ZK;Nacos从1.0版本开始支持CP/AP混合模式集群 默认的情况下 Ap模式

注册中心 追求是可用性 建议采用AP模式。

Base理论

BASE是Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent(最终一致性)三个短语的缩写。BASE理论是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的总结, 是基于CAP定理逐步演化而来的。BASE理论的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

(一)基本可用

基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性,注意,这绝不等价于系统不可用。

比如:响应时间上的损失。正常情况下,一个在线搜索引擎需要在0.5秒之内返回给用户相应的查询结果,但由于出现故障,查询结果的响应时间增加了1~2秒。

系统功能上的损失:正常情况下,在一个电子商务网站上进行购物的时候,消费者几乎能够顺利完成每一笔订单,但是在一些节日大促购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面。

(二)软状态

软状态指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时

在分布式架构中 数据同步 是需要经过网络,网络的传输是需要时间的,

短暂的延迟是允许(几毫秒),但是最终数据是需要一致性。

(三)最终一致性

最终一致性强调的是所有的数据副本,在经过一段时间的同步之后,最终都能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。

强一致性、弱一致性 最终一致性概念

分布式事务解决方案

环境准备

创建两个不同的数据库

mayikt_integral 积分数据库

CREATE TABLE `mayikt_integral_info` (
  `id` int(20) NOT NULL AUTO_INCREMENT,
  `value` int(11) DEFAULT NULL,
  `user_id` int(11) DEFAULT NULL,
  `pay_id` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;

mayikt_pay 支付数据库

CREATE TABLE `mayikt_payinfo` (
  `id` int(11) DEFAULT NULL,
  `pay_id` int(11) DEFAULT NULL,
  `pay_name` varchar(255) DEFAULT NULL,
  `pay_state` int(11) DEFAULT NULL,
  `pay_money` decimal(10,0) DEFAULT NULL,
  `pay_userid` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

支付服务接口

1.单体项目多数据源 可以jta+ Atomikos;

2.基于RabbitMQ的形式解决 最终一致性的思想;

3.基于RocketMQ解决分布式事务 采用事务消息;

4.LCN采用LCN模式 假关闭连接 (目前已经被淘汰) 官网已经无法访问; (基于2PC)

5.Alibaba的Seata 背景非常强大,已经成为了主流 但是性能一般;(基于2PC)

以上适合于在微服务架构中,不适合于和外部接口保证分布式事务问题。

\6. 跨语言的方式实现解决分布式事务问题 类似于支付宝回调方式

如果项目是追求快速响应 建议采用MQ最终一致性方案 实现解决分布式事务问题。

基于RabbitMQ的形式解决分布式事务

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。

RabitMQ官方网站:

https://www.rabbitmq.com/

img

流程:

1.采用MQ异步的形式增加积分。

1.先修改支付表状态为已经支付

2.向mq服务器端投递一个增加积分的msg

3.单独启动消费者 增加积分。

设计点:mq 如何保证 数据一致性?

支付表状态为已经支付 但是积分没有增加—影响还好 补偿

思考点:

生产者:如果生产者投递消息失败呢?

向mq服务器端投递msg,采用同步的形式

如果投递失败,则直接回滚当前我们的支付的事务。

方式1:

优点:确保消息必须落地到mq服务器端

缺点:同步过程—发生阻塞 影响整体接口 吞吐量。

方式2:采用异步—消息确认机制

采用异步的形式 获取到,投递消息的结果。

如果消息投递失败,生产者可以补偿(重试 继续投递。)

写在日志表中,后续定时任务 补偿 投递消息到mq中。

1.补偿 重试策略

mq服务器端:mq如果宕机呢?

消费者:消费者失败呢?

生产者: 确保我们msg 必须投递到mq服务器端。

消费者: 必须确保消费者 消费成功?

消费者必须消费成功之后,才可以将该消息删除(手动ack)

注意消费者 重复消费的问题 -----幂等性问题。

1.提前查询(重试过程中都是间隔) 如果已经增加过积分,

重试的请求过来 就不会重复增加积分。----在 业务层面保证数据 唯一性

2.需要在db层面保证业务 幂等性。 根据支付的id+userid 唯一约束

1.在kafka中 消费消息 根据offset 提交offset值 消费成功 失败 是不会删除msg

后期通过 定时任务或者 淘汰策略 删除日主。

2.rabbitmq 消费者消费成功 主动告诉给 mq服务器端 将该msg消息删除。

使用mq解决分布式事务问题?如何减少延迟呢?

1s 生产者 向mq中投递几十万条msg

1s 消费者每次只会取出一条msg 消费

生产者与消费者速率完全不匹配的。

消费者速率 不匹配导致 数据延迟 很久。

提高消费者速率:

1.消费者集群 在同一个组消费消息 不会重复消费

2.消费者批量获取msg 批量获取100或者1000条

用户投诉 已经支付呢?积分没有增加

用户向客服反馈,人工补偿的形式增加积分。

1.生产者:必须确保消息投递到MQ成功;

Ack 消息确认机制

同步或者异步的形式

方式1:Confirms

方式2:事务消息

2.MQ服务器端:需要将消息持久化,避免MQ宕机之后消息丢失; Mq服务器端 在默认的情况下 都会对队列中的消息实现持久化

持久化硬盘。

3.消费者:必须确保消息消费成功(同时需要注意幂等性问题);

​ 3.1在rabbitmq情况下:

​ 必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。

​ 3.2在kafka中的情况下:

​ 不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。

4.延迟问题:提高消费的速率

1.MQ消费者批量消费

2.MQ消费者集群消费

docker 安装 RabbitMQ
version: '2'
services:
  rabbitmq:
    #setup host name
    hostname: myrabbitmq
    #use docker images
    image: rabbitmq:management
    #ports setting
    ports:
      - 15673:15672
      - 5673:5672
    restart: always
    #volumes setting
    volumes:
      - ./data:/var/lib/rabbitmq

docker-compose up

img

默认账户和密码:guest/guest

http://127.0.0.1:8070/updatePaySuccess?payId=1 测试接口

http://192.168.75.145:15673/

img

支付服务
public interface PayService {

    /**
     * 根据支付id 修改支付状态 是为 已经支付
     *
     * @param payId
     * @return
     */
    @RequestMapping("/updatePaySuccess")
    ResponseEntity updatePaySuccess(Long payId);
}
  @Autowired
    private IntegralServiceFeign integralServiceFeign;

    @Override
    @Transactional
    public ResponseEntity updatePaySuccess(Long payId) {
        // 1.先查询 该支付id 对应的数据是否存在
        // 2.如果存在的情况下 状态是为 未支付---
        int result = payServiceMapper.updatePaySuccess(payId);
        if (result <= 0) {
            return ResponseEntity.status(500).body("fail");
        }
        // 使用feign调用积分服务接口增加 积分。
        IntegralDto integralDto = new IntegralDto();
        integralDto.setValue(1000l); // 积分的值1000
        integralDto.setUserId(123l); // 123
        integralDto.setPayId(payId); // 支付的id
        ResponseEntity integralResult = integralServiceFeign.addIntegral(integralDto);
//        // 判断如果调用积分服务接口失败了, 则或回滚当前事务
//        if (RpcConstant.RPC_500_CODE.equals(integralResult.getStatusCode())) {
//            // 手动回滚当前事务
//            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
//            return ResponseEntity.status(500).body("fail");
//
//        }
        // 程序执行到 48行 调用完积分服务接口之后 -----积分服务事务提交了。积分服务 对应 积分数据库 积分表 是可以查询到 增加积分数据
        // 模拟代码报错   支付服务调用完积分服务接口之后 突然返回呢?
        if (payId == 1) {
            int j = 1 / 0;  // 支付服务事务回滚了,支付表中状态 未支付状态 但是 积分服务中 积分数据已经提交了。
            // 积分增加---查询到 显示未支付。--解决分布事务问题
        }
        return ResponseEntity.status(200).body("ok");
    }
积分服务
public interface IntegralService {

    /**
     * 增加积分服务接口
     * @param integralDto
     * @return
     */
    @RequestMapping("/addIntegral")
    ResponseEntity addIntegral(@RequestBody IntegralDto integralDto);
}
@Data
public class IntegralDto {
    private Long value;
    private Long userId;
    private Long payId;
}

@RestController
@Slf4j
public class IntegralServiceImpl implements IntegralService {
    @Autowired
    private IntegralMapper integralMapper;

    public ResponseEntity addIntegral(@RequestBody IntegralDto integralDto) {
        try {
            int result = integralMapper.insertIntegral
                    (integralDto.getValue(), integralDto.getUserId(), integralDto.getPayId());
            return result > 0 ? ResponseEntity.status(200).body("ok") : ResponseEntity.status(500).body("fail");
        } catch (Exception e) {
            log.error("<e:{}>", e);
            return ResponseEntity.status(500).body("fail");
        }
    }
}

2PC/3PC

1.LCN采用LCN模式 假关闭连接 (目前已经被淘汰) 官网已经无法访问; (基于2PC)

2.Alibaba的Seata 背景非常强大,已经成为了主流 但是性能一般;(基于2PC)

2PC(两阶段提交协议)

1.两阶段提交协议可以理解为2pc,也就是分为参与者和协调者,协调者会通过两次阶段实现数据最终的一致性的。

2.2PC和3pc的区别就是解决参与者超时的问题和多加了一层询问,保证数据的传输可靠性。

两段提交顾名思义就是要进行两个阶段的提交:第一阶段,准备阶段(投票阶段) ; 第二阶段,提交阶段(执行阶段),中间必须要有一个协调者。

第一节阶段:

协调者会给每个参与者发送一封邮件,如果所有的参与者回复 ok,

如果中间有一个参与者 回复fail。

第二阶段:

就会给每个参与者发送一封确认的邮件,所有参与者发送fail。

生活中例子

例如在我们团队中有 小军、小安、小薇 三个人,打算明天想去聚餐?那如何达成一致呢?

我们就必须选出一名 组长 假设选举 小军为组长 小安和小薇就是为 组员

第一阶段:

小军在给小安和小薇 发一封邮件说“发送邮件:明天晚上7点准时聚餐,你们有时间吗?”

小安和小薇 如果回复“ok”

第二阶段:

小军收到了 小安哥小薇回复的“ok”,就会给 小安和小薇在发一封邮件 说明天晚上7点准时聚餐

小军收到了 小安回复的"ok",小薇回复的“fail”,就会给 小安和小薇在发一封邮件 说明天晚上7点不能够聚餐

小薇没有时间。

第一阶段

我们协调者,会给每一个参与者发送Prepare(预备)消息,执行本地数据脚本但不提交事务;

如果协调者收到了参与者成功的消息,则会给每个参与者发送提交(Commit)消息 , 如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中被占用的资源,显然2PC做到了所有操作要么全部成功、要么全部失败。

第二阶段

如果协调者收到了参与者成功的消息,则会给每个参与者发送提交(Commit)消息 , 如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中被占用的资源,显然2PC做到了所有操作要么全部成功、要么全部失败。

img

img

2PC缺陷

1、协调者节点挂了此场景比较主流的方案是选举机制,首次即通过选举决定谁是协调者,当协调者挂掉后重新选举。

2、commit阶段出现部分失败

LCN解决分布式事务问题

概述

http://www.txlcn.org/zh-cn/ LCN并不生产事务,LCN只是本地事务的协调工

现在官网已经无法访问。

可以参考:GitEE https://gitee.com/wangliang1991/tx-lcn?_from=gitee_search

默认密码为:codingapi

LCN官方的文档:https://www.cnblogs.com/lmyupupblogs/p/12499815.html

LCN 基于代理数据源 假关闭(没有提交事务)----引发行锁问题,导致其他

线程无法对该行数据做写的操作。

seata模式 采用 undo log日志 做逆向回滚----mysql事务隔离级别知识点 mvcc

版本链。

整体实现思想 是一样。

设计到

1.aop技术

2.ThreadLocal

3.全局id

4.netty技术

5.feign客户端代理拦截器技术

面试官?2PC/3pc 哪些地方有使用?

lcn模式 、seata模式

原理

  1. 首先我们的lcn协调者(TM)会和lcn客户端(TC)通过引入的netty一直保持着长连接(持续监听)。

  2. 当请求的发起方(调用方)进入接口业务之前,会通过AOP技术进到@LcnTransaction注解中去LCN协调者那边生成注册一个全局的事务组Id(groupId)。

  3. 当发起方(调用方)通过rpc调用参与方(被调用方)的时候,lcn重写了Feign客户端,会从ThreadLocal中拿到该事务组Id(groupId),并将该事务组Id设置到请求头中。

  4. 参与方(被调用方)在请求头中获取到了这个groupId的时候,lcn会标识该服务为参与方并加入到该事务组,并会被lcn代理数据源,当该服务业务逻辑执行完成后,进行数据源的假关闭,并不会真正的提交或回滚当前服务的事务。

  5. 当发起方执行完全部业务逻辑的时候,如果无异常会告知lcn协调者,lcn协调者再分别告诉该请求链上的所有参与方可以提交了,再进行真正的提交。若发起方调用完参与方后报错了,也会告知lcn协调者,lcn协调者再告知所有的参与方进行真正的回滚操作,这样就解决了分布式事务的问题。

通俗易懂翻译:

\1. 发起方与参与方与我们的LCN管理器全局事务协调者一直保持长连接;

\2. 发起方在调用接口之前会使用Aop生成一个全局的事务分组id;

\3. 发起方在调用之后的时候会在请求头中传递该全局事务分组id;

\4. 参与方从请求头中获取该事务分组id,当前业务执行完毕之后不会提交该事务,则会使用假关闭。

\5. 发起方调用接口完之后,如果出现异常的情况下,会通知给协调者回滚该事务,协调者在通知给参与方实现回滚事务。

参入方—该接口被人调用

发起方----入口接口

协调者--------组长 2PC 协调者 负责通知 参与方 提交还是回滚的

AOP技术

环绕通知

目标方法之前----提前生成全局事务id

目标方法 Object result = dtxLocalControl.doBusinessCode(info);

目标方法之后

@LcnTransaction

public ResponseEntity updatePaySuccess(Long payId) {

​ integralServiceFeign.addIntegral

}

LCN Feign客户端拦截器 获取 到 最开始在aop中生成的

全局事务id?

角色划分:

\1. 全局事务协调者 (组长);

\2. 发起方-----调用接口者

\3. 参与方----被别人调用接口

LCN实现分布式事务方案:有可能会引发行锁问题。

img

环境构建

  1. txlcn-tc:TXLCN分布式事务客户端
  2. txlcn-common:公共模块
  3. txlcn-logger:日志模块。(默认提供日志持久化到MySQL的支持)
  4. txlcn-tm:TXLCN事务管理器
  5. txlcn-txmsg:事务消息扩展接口
  6. txlcn-txmsg-netty:事务消息接口的Netty实现
  7. txlcn-tracing:分布式事务追踪工具

启动事务协调者

1.启动事务协调者

A.修改db和redis配置

执行该sql语句脚本。

📎tx-manager.sql

http://127.0.0.1:7970/admin/index.html#/login

img

密码:codingapi

启动发起方和参与方

Maven依赖

  <dependency>
            <groupId>com.codingapi.txlcn</groupId>
            <artifactId>txlcn-tc</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.codingapi.txlcn</groupId>
            <artifactId>txlcn-txmsg-netty</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

发起方和参与方接口上加上 @LcnTransaction

发起方:

    @Override
    @Transactional
    @LcnTransaction
    public ResponseEntity updatePaySuccess(Long payId) {
        // 1.先查询 该支付id 对应的数据是否存在
        // 2.如果存在的情况下 状态是为 未支付---
        int result = payServiceMapper.updatePaySuccess(payId);
        if (result <= 0) {
            return ResponseEntity.status(500).body("fail");
        }
        // 使用feign调用积分服务接口增加 积分。
        IntegralDto integralDto = new IntegralDto();
        integralDto.setValue(1000l); // 积分的值1000
        integralDto.setUserId(123l); // 123
        integralDto.setPayId(payId); // 支付的id
        ResponseEntity integralResult = integralServiceFeign.addIntegral(integralDto);
//        // 判断如果调用积分服务接口失败了, 则或回滚当前事务
        if (RpcConstant.RPC_500_CODE.equals(integralResult.getStatusCode())) {
            // 手动回滚当前事务
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return ResponseEntity.status(500).body("fail");
        }
        // 程序执行到 48行 调用完积分服务接口之后 -----积分服务事务提交了。积分服务 对应 积分数据库 积分表 是可以查询到 增加积分数据
        // 模拟代码报错   支付服务调用完积分服务接口之后 突然返回呢?
        if (payId == 1) {
            int j = 1 / 0;  // 支付服务事务回滚了,支付表中状态 未支付状态 但是 积分服务中 积分数据已经提交了。
            // 积分增加---查询到 显示未支付。--解决分布事务问题
        }
        return ResponseEntity.status(200).body("ok");
    }

参与方:

   @Transactional
    @LcnTransaction
    public ResponseEntity addIntegral(@RequestBody IntegralDto integralDto) {
        try {
//            int result = integralMapper.insertIntegral
//                    (integralDto.getValue(), integralDto.getUserId(), integralDto.getPayId());
            // update操作
            int result = integralMapper.updateIntegral(integralDto.getUserId());
            return result > 0 ? ResponseEntity.status(200).body("ok") : ResponseEntity.status(500).body("fail");
        } catch (Exception e) {
            log.error("<e:{}>", e);
            // 积分服务如果报错了 积分服务就已经回滚了 响应状态500
            return ResponseEntity.status(500).body("fail");
        }
    }

需要在配置文件中 新增:

tx-lcn:
  client:
    manager-address: 127.0.0.1:8070
  logger:
    enabled: true

启动项目报错:

|  scopedTarget.dataSource defined in class path resource [org/springframework/boot/autoconfigure/jdbc/DataSourceConfiguration$Hikari.class] ↑     ↓ |  org.springframework.boot.autoconfigure.jdbc.DataSourceInitializerInvoker

需要在配置文件中 新增:

spring:
  application:
    name: mayikt-pay  #服务名称 在 注册中心展示服务名称 --
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848 # nacos服务注册中心Server端 地址
    refresh:
      refreshable: none

​ refresh:

​ refreshable: none

img

img

触发行锁策略,导致该行数据无法被其他线程做写操作 只能够读。

LCN模式存在一个非常大的缺点? 容易触发行锁机制。

发起方调用完参与方接口之后,发起方发送请求给协调者时

,协调者宕机呢?参与方 一直没有收到 提交或者回滚的结果

就会导致 该行数据一直被锁住 其他的线程一直无法修改。

LCN源码解读

发起方

发起方生成全局id

1.Aop的重写的入口TransactionAspect

判断请求方法上是否有加上LcnTransaction注解,如果有

加上该注解,则开始生成全局id;

    @Around("lcnTransactionPointcut() && !txcTransactionPointcut()" +
            "&& !tccTransactionPointcut() && !txTransactionPointcut()")
    public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
        DTXInfo dtxInfo = DTXInfo.getFromCache(point);
        LcnTransaction lcnTransaction = dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
        dtxInfo.setTransactionType(Transactions.LCN);
        dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
        return dtxLogicWeaver.runTransaction(dtxInfo, point::proceed);
    }

2.执行:com.codingapi.txlcn.tc.aspect.weave.DTXLogicWeaver#runTransaction

从当前threadLocal中获取到对应的 DTXLocalContext 如果没有获取到则创建DTXLocalContext

放入到threadLocal中。

3.执行目标方法 ,当目标方法中有使用feign 实现RPC远程调用接口 会走重写feign客户端拦截器RequestInterceptor 通过threadLocal获取对应的全局id,放入到请求头中。

img

 log.debug("tracing transmit group:{}", TracingContext.tracing().groupId());
            tracingSetter.set(TracingConstants.HEADER_KEY_GROUP_ID, TracingContext.tracing().groupId());
            tracingSetter.set(TracingConstants.HEADER_KEY_APP_MAP,
                    Base64Utils.encodeToString(TracingContext.tracing().appMapString().getBytes(StandardCharsets.UTF_8)));

参与方获取全局id

1.SpringTracingApplier

拦截器 获取feign客户端请求头中的参数全局事务分组id,缓存到threadlocal中。

     String groupId = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_GROUP_ID)).orElse("");
        String appList = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_APP_MAP)).orElse("");
        TracingContext.init(Maps.newHashMap(TracingConstants.GROUP_ID, groupId, TracingConstants.APP_MAP,
                StringUtils.isEmpty(appList) ? appList : new String(Base64Utils.decodeFromString(appList), StandardCharsets.UTF_8)));
        if (TracingContext.tracing().hasGroup()) {
            log.debug("tracing apply group:{}, app map:{}", groupId, appList);
        }

\2. Aop的重写的入口TransactionAspect

\3. 事务不会提交 暂时假关闭, DataSourceAspect##LcnConnectionProxy

基于代理数据源的形式 直接重写了原生的了sql连接 注释掉 commit方法

img

该方案存在一个非常大的缺陷,如果事务协调者没有及时通知 参与方 回滚或者提交 ,

在mysql innodb 存储引擎中 该行数据一致被锁住 无法被其他线程做写的操作。

LCN 发起方调用参与方接口,如果发起方响应超时呢?

如果协调者宕机呢?会发生哪些问题?

LCN 底层基于数据源 代理

协调者发送通知给我们参与方?

mysql中 每行数据在做写操作的时候 会触发行锁机制,

该行数据已经被其他线程占用在做写操作 没有提交事务或者回滚

事务,该行数据无法被其他的线程做写操作。

lcn 解决分布式事务 引发 行锁、表锁问题。

支付服务(发起方)调用积分服务接口(没有提交事务 意味着

该行数据一直被锁住其他的线程无法修改该行数据)完成之后

支付服务不发送请求给事务协调者,意味着 积分服务事务一直

没有提交。

lcn 超时机制?

aop?

支付服务(发起方)

@LcnTransaction----
public ResponseEntity updatePaySuccess(Long payId) {

经过aop技术 aop提前生成好 就是我们的 全局事务id

存放在我们ThreadLocal

执行目标方法----updatePaySuccess(Long payId) {

ResponseEntity integralResult = integralServiceFeign.addIntegral(integralDto);

----feign代理 设置 aop中生成好的全局id

}

发起方: 入口 支付服务–发起方 全局事务id true

参与方:接口被别人调用 false

判断请求头是否可以获取全局事务id。

aop 拦截 目标方法

try {

Object result = dtxLocalControl.doBusinessCode(info); // 目标方法

dtxLocalControl.onBusinessCodeSuccess(info, result);

// 发送请求给我们事务协调者 提交事务,事务协调者在发送请求给所有

参与方 进行提交事务。

}catch (Throwable e) {

dtxLocalControl.onBusinessCodeError(info, e);

/ 发送请求给我们事务协调者 回滚事务,事务协调者在发送请求给所有

参与方 进行回滚事务。

}

aop 代理数据源 不让 提交事务

@Override
public void commit() throws SQLException {
*//connection.commit();
*}

@Override
public void rollback() throws SQLException {
*//connection.rollback();
*}

@Override
public void close() throws SQLException {
*//connection.close();
*}

Logo

更多推荐