前言

工作后一直在思考怎么实现一个永不崩溃的服务,各大厂商也会强调自己的服务达到了多少的可用性。然而在近两年互联网出现了许多P0级的服务中断事件,其中滴滴打车的故障更是造成了几十小时级的不可用。了解中得知故障原因大概率是由于 Kubernetes的升级,

根据滴滴技术公众号升级方案,有这样一句话1

集群体量大,最大集群规模已经远远超出了社区推荐的 5 千个node上限,有问题的爆炸半径大

Kubernetes已经是目前云原生的基础组件,底层存储使用的则是叫 etcd的 KV存储,因为 etcd的性能限制,一般建议 5000 个节点。笔者工作与大数据相关,开发中也经常遇到 KCM(Kubernetes Controller Manager)挂掉导致spark任务大量失败的场景,底层问题原因一般也和 etcd有关。这里不讨论事故的真实原因,只是感慨作为开发者,如果对底层系统内部原理不了解,那么出现事故产生的后果是不可估量的,特别是在互联网已经遍布生活的中国。带着这样的想法学习了 etcd,本文作为学习总结,欢迎讨论、拍砖。

简介

etcd is a strongly consistent, distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines. It gracefully handles leader elections during network partitions and can tolerate machine failure, even in the leader node

etcd目标是构建一个强一致性、具有容错性分布式kv存储框架,基于易于理解的 Raft协议实现强一致性,使用了简单的 KV存储模型,支持grpc和 http调用。大名鼎鼎的容器编排系统Kubernetes 就使用了etcd作为底层存储。Google的 Spanner和微信使用的 PaxosStore都是基于 Paxos协议实现的 KV存储,早期 Paxos也是一致性协议的唯一标准,不过近年来 Raft协议热度主键上升,在 Google Trends中 2024 年的 Raft algorithm的热度已经是Paxos algorithm的2~3 倍,近年来的开源数据库包括 etcd、TiDB、CockroachDB也都基于 Raft协议。Paxos和 Raft各有优劣,一般认为 Raft工程实现简单,但是有租约不可用的问题,微信的 KV基于无租约 Paxos协议,没有 Leader节点,可以做到无痕换机。

本文主要介绍etcd的一致性原理和存储机制,最后会简单介绍etcd的一些应用与优化。

etcd架构

整体架构

etcd整体架构如下图所示
在这里插入图片描述

实现上可以将etcd分为以下部分:

  1. client层: 包括 client V2和V3这两个客户端 API库,同时支持负载均衡和节点故障转移。
  2. API层: API层负责对外提供 HTTP和gRPC两种类型的访问接口,etcd Raft层进行内部节点选举和心跳通信使用 HTTP API。
  3. Raft层: Raft层实现了包括 Leader选举和日志复制等Raft算法核心功能,并且通过wal机制持久化日志条目,保障 etcd多节点的一致性和可用性,同时使用 Read Index机制实现读请求的强一致性。
  4. 通信层: 集群多节点通信实现,使用 gRPC在服务器peer间和client间通信。
  5. 存储层: 包括 KeyIndex和BoltDB,KeyIndex基于B-Tree,存储每个 kv对的历史版本,在内存中;BoltDB基于 B+Tree,存储在磁盘中,也是 KV引擎,结合 KeyIndex和 BoltDB可以实现 MVCC机制。

Raft协议理论:etcd如何实现高可用、强一致

Raft算法背景

在 Raft之前 Paxos算法是一致性算法的代名词,Google之前的分布式系统基本都由Paxos实现,知名的包括 Chubby、MegaStore、Spanner,微信使用的PaxosStore启发自 MegaStore,也是基于 Paxos算法。单对于实际的工程实现而言,Paxos太过晦涩,作者写的论文《Paxos Made Simple》更是一点也不 Simple,在学习过微信 PaxosStore的 Paxos工程实现以后才稍微入门。开发者亟需一个易于理解实现的一致性算法,Raft算法就是在这个背景下诞生的。

Raft不难理解,学习 Raft的最好方法是看对应的论文2,这里简单阐述 Raft的一些要点和个人对 Raft以及相关分布式共识协议算法的一些理解。

Raft实现原理

一致性的解释

  1. CAP中的一致性,也对应本文提到的线性一致性
    (1)这也叫可线性化或者强一致性,与此容易混淆的是可串行化。代表的是客户端读取要的一定是最新数据,只要这个请求之前有请求读到过新数据,当前请求也一定能读到。
    (2)而可串行化是对于事务而言的,代表事务执行的结果存在一个线性化的排列,但不代表每条指令一定读到最新结果。
    (3)可串行化是对应多指令,可线性化对应单指令。
  2. ACID中的一致性
    (1)主要指数据库是否处于应用程序所期待的正确状态。事务执行前后数据库应该是“一致”的,或者说是正确的。
    (2)这听着其实有点玄学,啥叫正确性。其实这是应用层的事情,严格来说这个一致性不属于数据库的 ACID范畴3
  3. 副本一致性
    这个很好理解了,就是副本间的数据是一样的,实际中可以使用主从同步复制达到副本一致性,但是效率不高。
  4. 一致性哈希
    虽然也叫一致性(Consistent),但是和上面完全是两码事,只是一种动态分区再平衡的做法。

下文涉及讨论的一致性线性一致性强一致性线性化均属于 CAP中的一致性

复制状态机(replicated state machines)
对于所有的一致性算法包括 Raft、Paxos都可以套用下面Raft论文的这张图,一致性算法作为共识模块(Consensus Module),比如 etcd/raft(基本按论文实现,是非常好的学习源码),客户端发起请求到共识模块后将请求存在 WAL日志(Write-ahead logging,持久化在磁盘),共识模块将日志复制到大多数节点后提交日志,由一个异步的线程将已经提交的日志应用到状态机(State Machine)

每台机器都由保存这样的顺序日志,下图的例子给的是kv赋值语句,状态机按顺序执行这些日志,共识算法负责保证集群多台服务器间日志的一致,通过多机的方式提供容灾性,共识算法和状态机通过日志联系在一起。
在这里插入图片描述

Raft子问题
为了便于理解和工程化实现,Raft将一致性问题分为 3 个子问题:选举、日志复制、安全性。接下来会简单介绍这三个子问题的含义,以及为什么通过这三个子问题可以达成强一致性。

  1. 选举(Leader Election):
    Raft协议将节点分为3 种类型:Leader(领导者)、Condidat(候选者)、Follower(追随者),正常情况下一个集群内只可能有一个Leader节点,由 Leader处理接收客户端发来的日志,并复制到集群所有节点上,单 Leader节点的设计极大简化了实现。选举阶段的关键是:(1)如何保证至多只有一个 Leader(2)Leader节点崩溃时快速选举出新的 Leader
    在这里插入图片描述

如上图所示:

  • 所有节点一开始都是 Follower状态。
  • Leader会定时给所有节点发送心跳包,收到心跳包后会刷新选举超时时间。
  • 如果长时间没有收到心跳包,超过选举超时时间,Follower节点会变成 Condidate节点,将 Term(选举任期)加一,发起竞选。
  • 选举过程:收到大多数节点投票则变成 Leader,为了防止一直出现 condidate从而永远无法选举出正确 Leader,会设置随机超时时间。

由此回答之前的问题:
(1)如何保证至多只有一个 Leader: Follower变成 Condidate仅需选举阶段时会增加 Term,Follower会拒绝掉 Term小于自己已知最大 Term的Condidate选票或者 Leader请求,由此只有 Term最大的节点会成为 Leader。选举需要集群多数派(这里多数派)节点的同意,那么根据抽屉原理,同一 Term内两个节点不可能同时选举成功,只可能有一个 Leader。
(2)Leader节点崩溃时快速选举出新的 Leader: Raft通过心跳包和随记超时时间的机制避免活锁,在超过选举时间的时候 Follower会变成 Condidate节点进行竞选。不过这里选举超时时间也可能成为系统瓶颈,如果 Leader崩溃那么在这个时间内系统是不可用的状态。要优化掉这个不可用时间就得选用无Leader租约的一致性算法,比如 Paxos。

  1. 日志复制
    在这里插入图片描述
  • 如上图所示,客户端的每一条请求都包含一条能被复制状态机执行的指令。Leader将这些指令作为日志条目进行追加,并复制到所有的 Follower节点。日志包含对应的日志索引、创建时的任期号、状态机对应的执行指令。
  • Leader节点负责将日志复制到 Follower,当日志复制到大多数节点上时日志称为已提交状态,Follower在接受日志的时候会检查当前日志前一条的索引是否与 Leader相同。日志被提交时,之前所有的日志条目也会被提交,包括其他Leader创建的日志。同时如果没有最新的日志条目是不可能被选举成 Leader。
  • 简单证明:Leader当选需要多数派节点的同意,日志提交需要多数派节点同意,同时如果日志没有比 Follower更新无法获得选票,根据抽屉原理,最新日志节点和选举一定有交集,可证Leader节点一定拥有最新日志;日志提交的前提是Leader需要将日志复制到多数派节点,Follower会校验收到的日志上一个节点索引是否能和自己的日志索引对上,如果对不上不会接收,根据数学归纳法可知,可证Leader和 Follower的节点日志一定有一样的顺序
  • 日志复制有一个重要特性:日志匹配特性(Log Matching Property),根据上面证明也可知其正确性
    (1)如果在不同的日志中的两个条目拥有相同的索引和任期号,那么他们存储了相同的指令。
    (2)如果在不同的日志中的两个条目拥有相同的索引和任期号,那么他们之前的所有日志条目也全部相同。
  • 通过以上限制,Raft算法保证所有已提交的日志都是持久化并且最终在所有的节点上都会按相同顺序进入状态机执行。
  1. 安全性(Safety)
    这个子问题主要是一些细节上的限制,在讨论上面两个子问题的时候其实已经涉及到了。包括两点:
    (1) Leader一定要拥有最新节点日志: 这很好理解,Raft协议中 Leader负责日志复制和对外的请求,如果Leader节点崩溃后拥有旧日志的节点选为 Leader,那么之前提交的日志就会被覆盖,数据会丢失!解决方法就是限制 Leader一定要拥有最新日志,如果日志不是最新无法被选举为 Leader。两份日志比较,任期更大的更新,如果任期一致则索引更大的更新。
    (2)只有当前任期的日志可以通过多数派提交: 在日志复制部分提到当日志复制到多数派节点时可以认为日志被提交,但这里有个隐藏的坑点,只有当前任期的日志可以通过这种计数的方法提交,对之前任期的节点这样做可能导致数据被覆盖。Raft的做法是当前任期日志复制到多数节点时提交,提交的同时将之前任期节点也提交,根据日志匹配的特性可以证明其正确性。下图展示了如果 Leader如果复制前任期的节点并且通过计数的方法提交,此时复制到多数机器上的日志也可能被覆盖。

在这里插入图片描述

子问题的正确性
通过实现上述三个子问题,得到了一个完整的一致性算法,根据数学归纳法和抽屉原理可以证明其充分性。那么思考一下,这是必要条件吗?很容易想到除了 Raft还有 Paxos协议,Paxos甚至都不需要 Leader,因此这些子问题当然不会是一致性实现的必要条件。不过 Raft和 Paxos实际上有共通性,都依赖集群多数派节点达成的一致。在阅读Raft论文的时候经常会看到**“majority”**这个单词,代表“多数派”的含义,Raft的正确性严重依赖通过多数派选举出来的 Leader。事实上,包括 Raft和 Paxos在内的一致性算法都是基于 Quorum协议,Raft中的多数派实现是 Quorum协议的一个特例,接下来介绍 Raft中多数派和Quorum协议的区别。

多数派与Quorum协议
前面提到了当共识协议将日志复制到多数派(majority) 机器时可以认为日志被成功提交,怎么定义多数派节点,为什么提交到多数派节点就能保证集群的一致性?

对于基于 Raft协议的etcd,是通过一次多数派的选举选举出Leader,通过 Leader节点进行写请求,而强一致性读也需要通过 Leader获取多数派的 ReadIndex,这里“”意味着大于集群数目一半,比如集群数目为 n,那么多数派就对应:⌊n/2⌋+1,根据抽屉原理,两次多数派读写一定有交集,这样能保证一定能读到最新数据。与 Raft相对的 Paxos协议同样是根据两次多数派请求完成一次强一致性读写。根据定义,这样的系统最多能容忍⌊(n-1)/2⌋个节点的崩溃,举例有 5 个节点的集群,最多容忍 2 个机器崩溃4

而 Quorum协议,,根据wiki定义如下:

  1. V r + V w > V Vr + Vw > V Vr+Vw>V
  2. V w > V / 2 Vw > V/2 Vw>V/2
  • Vw(write quorum)代表每个写操作的最少获票数,Vr(read quorum)代表每个读操作做的最少获票数
  • 规则 1 保证第一个数据不会被同事读写
  • 规则 2 则保证数据串行化修改,一份数据的冗余拷贝不可能同时被两个写请求修改。

了解了Quorum的定义可以发现,Raft的多数派实际上只是 Quorum协议的子集,要保证读写的一致性,只需要保证读请求和写请求有交集,写请求和写请求有交集即可。任何一种符合上述 Quorum定义的协议都可以替代基于 多数派(majority) Raft或者 Paxos协议,并且保证强一致性,其性能和可用性也会有所不同,可以基于Quorum设计新的一致性协议用以优化系统延迟,当然,除了多数派以外的任何一种Quorum实现可用性都不如多数派实现。

Raft基于Quorum定义完成成员变更,基于hierarchical quorum的选主实现了joint consensus集群变更算法。

etcd如何实现线性一致性读

etcd底层使用了 B-Tree和 B+Tree,写入会伴随节点的分裂、平衡,是典型的读多写少 KV(与之相对,使用了 LSM存储引擎的 KV则适用于写多读少场景)。

串行读与线性一致性读:

etcd基于 Raft算法,为了保证高可用集群中会部署多台机器,etcd提供了两种读模式:串行读(Serializable)和线性一致性读(linearizable)。这两个概念似乎都在表达“按顺序读”,但实际上完全不一样,简单理解可以看作串行读不保证最新但是效率高,线性一致性读保证最新但是效率低。

串行读:
适用于数据敏感性低的场景,Raft写请求完成只是代表WAL日志被复制到大部分,此时处于提交状态(committed),真正要能查到数据需要等状态机将日志应用(applied),而这是一个异步过程。串行读直接读取状态机数据返回,不需要通过 Raft协议与集群进行交互,属于最终一致性

线性一致性读:
适用于数据敏感性高的场景,对应线性一致性,或者叫可线性化、强一致性等。线性一致性的定义比较微妙,基本的想法是让集群看起来只有一个对象,读到过的数据下次读到的版本一定比现在更新,且所有操作都是原子的。在这个场景下可以认为是读请求只会读到最新已经commit的数据。

线性一致性读和 ReadIndex
有两种情况可能返回旧数据:

  1. 通过串行读已经知道日志从提交(committed)到应用(applied)是个异步过程,而且 Leader和 Follower节点都能处理读请求,此时读到的数据可能是未被状态机应用的旧数据。
  2. 网络分区,俗称“脑裂”,此时如果 Leader认为自己拥有最新日志并且继续响应读请求就可能返回旧数据造成不一致。

实现线性一致性读依赖于ReadIndex,服务节点收到线性读请求后会向Leader节点**获取集群已经提交最新日志索引(commited Index),Leader收到ReadIndex后会向Follower节点发起确认,获得大多数请求后才将日志索引返回。服务节点等待状态机已经应用的日志索引大于之前得到的最新日志索引时,才会将数据返回给客户端5

Raft和 Paxos、ZAB协议以及相关产品比较

工程优化后的 Paxos一般也要尽量保证执行请求的节点保持稳定在一个上面(类似于 Raft的Leader了),否则每次提交都要进行两阶段的确认效率实在太低。Raft其实是Paxos的一个变种,易于理解和实现的简化版本,要求强 Leader和日志顺序写。

Paxos算法并不需要Leader,那么 Leader崩溃的时候也就没有切换时间,而 Raft在 Leader崩溃时则有租约不可用(注意:这里的租约是指 Raft的选主过期时间,要与 etcd的 lease租约功能区分开来)的问题,一般5~10s。而且 Paxos支持日志空洞,可能有随机写,实现上更复杂一点,而 Raft强制要求日志顺序写,不允许日志空洞,对存储系统更低,可以直接上机器硬盘。从性能角度来说 Paxos更好,而且允许日志空洞更加灵活;从实现成本角度 Raft更优,实际工程中实现算法的是人,Paxos理解的难度会造成项目风险,能 hold住才是第一要义,后面再考虑优化,或许这也是目前越来越多项目使用 Raft的原因。

ZAB协议是另一个分布式系统 Zookeeper的基础算法,也被认为是一种类 Paxos协议,写请求可以认为是强一致性,读请求只是最终一致性。

etcd zookeeper paxosstore NewSql产品(Spanner、TiDB、CockroachDB等)
一致性协议 Raft ZAB 无租约Basic Paxos multi-paxos/Raft/multi-Raft
数据模型 kv 目录 kv sql
Leader租约
MVCC机制 X X Sometimes
强一致性读 X Sometimes
成员变更
鉴权 基于 TLS访问 简单的 ACL(Access Control List) 多种

MVCC:etcd如何实现存储服务

什么是 MVCC

MVCC(Multiversion concurrency control)全称多版本并发控制机制,核心是保留数据的多个版本,以此实现对事务的并发管理,实现事务的各类隔离级别。数据库领域避免冲突主要有两种机制:

  1. 避免冲突,需要保证数据不可能出现同时操作的情况。加悲观锁保证同一时刻只有一个人对数据进行修改,常见有读写锁、两阶段锁
  2. 允许冲突,但是要求出现冲突时能够检测到,这样下游直接失败重试就好了,这种方法叫做乐观锁,常见实现有逻辑时钟、MVCC等。

MVCC实际上是一种乐观锁机制,只有在事务提交时才检测是否发生了冲突。etcd基于 MVCC实现了可靠的 Watch功能和事务机制。

etcd为什么要引入MVCC

etcd早期的v2版本将所有数据保存在内存中,随着Kubernetes项目的发展v2版本的瓶颈和缺陷也逐渐暴露。其一便是Watch机制的可靠性问题,v2版本不支持保存历史版本,只是在内存中使用滑动窗口存储了最近 1000 条变更日志,当出现请求堆积、网络波动时很容易出现事件丢失,而Kubernetes controller机制严重依赖 Watch功能。其次是内存问题,etcd v2在内存中维护了一棵树保存所有的key和value,会导致大量的内存开销,同时etcd v2需要定时将内存数据持久化到磁盘,会消耗 CPU和磁盘 IO,影响系统稳定性。于是 etcd推出了v3版本,基于 treeIndex(B-Tree)和 backend(BoltDB)实现支持MVCC的KV数据库。

etcd如何实现多版本数据

key-value:基本存储单位

kv对是etcd的最小存储单元,底层存储在 BoltDB中的key-value结构如下所示,包含key、value,lease结构信息(租约,用于数据过期、探活),版本号信息(MVCC和事务机制)6

message KeyValue {
  bytes key = 1;	
  int64 create_revision = 2;
  int64 mod_revision = 3;
  int64 version = 4;
  bytes value = 5;
  int64 lease = 6;
}
  • key:请求键,不允许空值
  • value:请求值
  • version:key的全局版本号,如果删除那么版本号会变成 0,同时每次操作版本号会加一。
  • create_revision:key最后一次创建的revision
  • mod_revision:key最后一次变更的revision
  • lease:key对应的lease id

,api里面出现了3 个“version”比较容易混淆,特别是version和reversion这两个概念,其中version就是简单的全局修改次数版本号,改一次加一,create_revision代表key创建时的版本号,mod_revision代表最后一次修改的对应版本号。

what is different about Revision, ModRevision and Version? #6518

revision:key的逻辑时钟

type revision struct {
   main int64    // 随着事务递增,同一个事务内main相同
   sub int64    // 事务内单调递增,每发生一次写操作sub增加
}

作为 kv数据库,etcd却并不是直接将 key作为键存储,而是将revision作为键,key和value作为键对应的数据存放。其中revision分为两部分:第一部分称为main revision,随着事务递增;第二部分叫做sub revision,两者结合保证revision是递增的。

比如通过Batch接口更新键值对,写入<key1,value1>和<key2,value2>两个键值对,在 etcd看来是这样的:

  • 第一次更新
revision={1,0},key=key1,value=value1
revision={1,1},key=key2,value=value2
  • 第一次更新
revision={2,0},key=key1,value=new_value1
revision={2,1},key=key2,value=new_value2

treeIndex:内存中的版本号B-Tree

etcd在内存中维护了一个 B-Tree保存了 key到revision的映射关系,叫作 treeIndex,其中每一个节点就是一个keyIndex。客户端查询的时候首先在内存找那个查询key对应的keyIndex拿到对应的 revision,用revision作键从 BoltDB中获取value。

与treeIndex相关的数据结构包括 keyIndex和 generation,其中 keyIndex包含 key对应的最新版本号和历史修改版本号,generation则代表key从创建到删除的过程,当创建了一个key其generation是 0,后续每次修改会想revs里面追加修改后的信版本号,删除了以后generation会变成 1。key经过多次创建删除,那么就会有多个generation。

保存 key和版本号的映射关系

type keyIndex struct {
    key         []byte
    modified    revision  // 最后一次修改key对应的版本号
    generations []generation // 保存key历史的版本号信息
}

generation保存了 key从创建到删除的历史版本号

type generation struct {
    ver     int64	// key的修改次数
    created revision // generation创建时的版本号
    revs    []revision // 每次修改key时的版本号
}

treeIndex结构,其中 BTree基于 Google开源的代码

type treeIndex struct {
	sync.RWMutex
	tree *btree.BTree
	lg   *zap.Logger
}

具体查询流程入下图所示:
yingyon

为什么treeIndex要使用 B-Tree?
从etcd的功能特性上分析,因为需要支持范围查询,因此哈希结构不适合。
从性能上分析,B-Tree相对于平衡树树高更低,而且每个节点支持存储多个数据,查询次数更少。这里etcd使用的是google实现的一个 go版本 B-Tree

BackEnd(BoltDB):底层存储架构
BackEnd是etcd的底层存储结构,设计上BackEnd支持多种backend实现,目前的实现使用的是 BoltDB。BoltDB是一个基于 B+Tree实现的基于事务的key-value数据库。当发起get key1操作时,会先向treeIndex询问key1对应的keyIndex,从中获取对应的revision版本号,再用revsion从BoltDB中获取 value7

BackEnd模块主要由ReatTx、BatchTx和Buffer组成,ReadTx定义了抽象的读事务模块,BatchTx在ReadTx的基础上定义了抽象的写事务接口,Buffer是内存中的数据缓冲区。为了优化写入的性能,etcd只有在写事务堆积到一定程度的时候才会刷盘(默认写事务堆积程度大于 1万),数据持久化的过程又backend异步goroutine处理,他通过事务批量提交,定时将boltdb页缓存中的数据持久化到磁盘中。

发生写事务的流程如下所示:

  1. 向treeIndex获取当前key的 keyIndex信息
  2. 将key-value数据存入buffer和boltdb中
  3. 创建/更新keyIndex,存入treeIndex
  4. backend异步协程提交事务

通过 generation了解到,key删除的时候并不会马上把数据从treeIndex和BoltDB中删除,而是创建一个新的generation,此时数据并没有真的删除,此时会在BoltDB中增加一个墓碑标记,例如正常通过revision:{2,0}可以从 BoltDB中查询key1对应的value1,如果key1被删除,那么BoltDB中的键就变成了{3,0,t},t就代表tombstone。

Watch机制:高效获取数据变化

Watch机制有什么用

etcd主要用于服务发现、分布式配置等功能,同时Kubernetes的控制器也依赖于etcd。除了存储,这里最重要的功能就是监听,比较前后的数据是否一致,是否发生了更新事件。

基于前文的 MVCC机制,实现了多版本的数据存储,有了历史版本,那么只要在数据更新的时候将历史版本推送出去就好了,如何判断要推送给哪些 client呢?总不能用一个个遍历监听事件判断要推送给谁吧,这样效率太低了。而且如果出现网络延迟推送事件阻塞了怎么办?etcd基于 MVCC机制和区间树实现了 Watch功能。

整体架构

基于MVCC的历史版本推送机制

etcd的v2版本使用滑动窗口将历史版本保存在内存中,默认只会保存最近 1000 条数记录。这样的缺陷显而易见,固定时间窗口只能保存有限的历史版本,当写请求较多或者出现网络波动时client将不得不发起expensive查询请求以获取最新数据以及版本号来持续监听数据。

比如Kubernetes的控制器依赖 Watch机制监听Pod的变化,如果Watch机制失败或者断开将发起List Pod请求,导致apiserver出现高负载,严重影响系统稳定性。

而 MVCC机制正式为了解决 Watch机制不可靠而诞生的,相比于v2版本将事件直接保存在内存环形数组中,MVCC机制将key的历史版本保存着BoltDB里。BoltDB数据持久化在磁盘中,因此保存的历史版本数上限提升,而且也不受重启影响。不过随着历史版本增加磁盘占用也会增加,需要通过压缩策略清理历史版本。同时当出现异常情况比如client因网络连接断开时,也可以通过MVCC的版本号机制从BoltDB中获取错过的历史时间,无需全量同步。

etcd提供了基于事件的异步监听接口,watch机制会不断的监听key当前或者历史的revision版本,获取对应变更,并且以流的形式传给client。key的每次变更都对应一个Event数据结构,Event提供了对应变化的数据和变化类型,包括 PUT和 DELETE。

message Event {
  enum EventType {
    PUT = 0;
    DELETE = 1;
  }
  EventType type = 1;
  KeyValue kv = 2;
  KeyValue prev_kv = 3;
}

基于 gRPC的流式推送机制

etcd v2版本使用轮询实现 Watch功能,每个watcher对应一个 TCP连接,为了简单实现,使用 http+json方式,client通过 HTTP/1.1长连接定时轮询server,当 watcher太多时轮询将消耗etcd-server大量的资源8

为了解决v2版本的缺陷,v3版本使用基于 HTTP/2的gRPC协议代替了HTTP/1.1,protobuf代替了Json。

首先是 HTTP/2协议,HTTP消息被分解为独立的帧(Frame),交错发送,帧是最小数据单位,会标识属于哪个流(Stream),流由多个数据帧组成,每个流对应唯一 ID,一个数据流对应一个请求或者一个包。通过流的机制HTTP/2实现了多路复用和乱序发送,并且能够双向通信。

通过 Watch流 api可以看到,每个 Watch流会对应一个 watchid,一个watchid对应一个 gRPC流:

创建 Watch流

message WatchCreateRequest {
  bytes key = 1;
  bytes range_end = 2;
  int64 start_revision = 3;
  bool progress_notify = 4;

  enum FilterType {
    NOPUT = 0;
    NODELETE = 1;
  }
  repeated FilterType filters = 5;
  bool prev_kv = 6;
}
message WatchResponse {
  ResponseHeader header = 1;
  int64 watch_id = 2;
  bool created = 3;
  bool canceled = 4;
  int64 compact_revision = 5;

  repeated mvccpb.Event events = 11;
}

删除 Watch流

message WatchCancelRequest {
   int64 watch_id = 1;
}

在这里插入图片描述

同时protobuf比 json效率更高,这个很好理解,json是自解释协议,数据里就带了类型,而protobuf需要proto文件解释,底层存储来说肯定比json效率高。这里不展开了,详细参考这篇文章:
理解 gRPC 协议

整体推送流程

在这里插入图片描述

  1. 上图是 etcd watch流程图,当通过 etcd client发起 watch请求的时候,首先会通过gRPC Proxy,这一部分会将多个 Watch请求合并,减少etcd server负担。
  2. 接下来会创建一个serverWatchStream,当收到 watch key请求会创建两个协程 recvLoop和 sendLoop,recvLoop负责接收client create/cancel watcher的请求、并将从管道中收到的请求转发给sendLoop最终发给client
  3. serverWatchStream收到 client创建 watcher的请求后会创建一个WatchStream并分配一个watcher id,每个 Watcher对应唯一的 watcherid
  4. 一旦 watcher创建成功就会存储在watchableStore的 synced watcher中,如果监听到版本号变化,则会将watcher放入 unsyned watcher中。版本号变化通过 MVCC得知,通过 watcher.ch管道进行信息中转。同时synced watcher和unsynced watcher底层有 map和 inerval tree两个数据结构,分别对应单个Key的监控和区间 Key监控。
  5. etcd启动时会创建syncWatchersLoop和 syncVictimsLoop协程,进行 watcher的同步操作
  6. 用户通过 Raft机制写入一致性数据,通过 MVCC机制产生 Event事件,并写入到管道中。

如何推送最新数据

watch机制将待同步的数据分为两类:
synced watcher: 顾名思义,代表已经同步完成的 watcher。如果创建 watcher未指定版本号或者指定版本号大于目前最新的版本号,那么就会保存着
unsynced watcher: 标识此类数据还未同步完成,落后最新数据版本。

synced watcher和unsynced watcher底层结构对应watcherGroup,一个watcherGroup对应多个wacher,可以看到有一个 map和一个IntervalTree,可以根据key快速寻找一个或者多个watcher。

watcherGroup结构

type watcherGroup struct {
	// 通过key寻找watcher
	keyWatchers watcherSetByKey
	// 通过区间寻找wacther
	ranges adt.IntervalTree
	// 监听的watcher列表
	watchers watcherSet
}

watchableStore结构

type watchableStore struct {
	*store

	// 读写锁
	mu sync.RWMutex

	// victims 是被阻塞在 watch channel 上的 watcher batches,channel满时会加入到victims
	victims []watcherBatch
	victimc chan struct{}

	// unsynced包含所有需要同步事件但未同步的watchers
	unsynced watcherGroup

	// 包含与 store进度一致已经同步的watcher
	synced watcherGroup

	stopc chan struct{}
	// 用于等待所有goroutine结束的 waitgroup
	wg    sync.WaitGroup
}

当创建完 watcher后,再执行 put/delete操作,通过Raft模块 apply到状态机,其中MVCC的 put事务结束后会将修改信息保存在 change数组中。

如下代码所示:End()函数会将mvccpb.KeyValue、操作类型、revision保存在 Event事件中,然后回调watchableStore.notify函数。watchableStore.notify函数会匹配出监听此 key并且处在synced watcherGroup中的watcher,并且要满足事件版本号大于等于watcher监听最小版本号才会将事件发送到 watcher对应的channel中。serverWatcherStream和sendLoop协程监听到 channel会推送给 client,完成一次最新数据的推送9

// MVCC事务结束时,处理键值对的变更,并通知所有的watcher,是实现etcd的watch功能的关键部分
func (tw *watchableStoreTxnWrite) End() {
	// 获取事务所有键值对变更
	changes := tw.Changes()
	if len(changes) == 0 {
		tw.TxnWrite.End()
		return
	}

	// 最后一次变更版本号,加一代表本次操作后版本号加一
	rev := tw.Rev() + 1
	evs := make([]mvccpb.Event, len(changes))
	// 遍历变更,根据事件类型创建 Event
	for i, change := range changes {
		evs[i].Kv = &changes[i]
		if change.CreateRevision == 0 {
			evs[i].Type = mvccpb.DELETE
			evs[i].Kv.ModRevision = rev
		} else {
			evs[i].Type = mvccpb.PUT
		}
	}

	tw.s.mu.Lock()
	// 回调,通知所有的观察者(watcher)这些变更事件
	tw.s.notify(rev, evs)
	tw.TxnWrite.End()
	tw.s.mu.Unlock()
}

如何推送异常事件

推送最新数据中有提到当写事务完成会将时间写入到管道,但是目前watcher的 channel管道的大小限制为128(原本是 1024,但是太过占资源就改成 128 了),如果因为负载等原因导致channel满了便进入异常事件推送机制,此时 etcd会将watcher从synced中删除加入到victims

整体推送流程中提到WatchableStore会启动两个协程,其中syncVictimsLoop就是负责阻塞watcher的推送,通过异步重试推送搜有的victim事件,moveVictims中推送失败会重新加入到victim watcherBatch等待下次推送。

如果推送成功,watcher监听的版本号minRev小于当前事件的版本号curRev,说明有历史事件没有已送完成,加入到unsynced watcherGroup,否则加入到synced watcherGroup。

// syncVictimsLoop尝试将堆积但计算好的watcher再推送到watcher channel中
func (s *watchableStore) syncVictimsLoop() {
	defer s.wg.Done()

	for {
		// 将 victims 中的数据发送出去
		// 不断循环更新所有的victim
		// 推送失败会重新加入到victim watcherBatch等待下次推送
		for s.moveVictims() != 0 {
		}
		s.mu.RLock()
		isEmpty := len(s.victims) == 0
		s.mu.RUnlock()

		var tickc <-chan time.Time
		if !isEmpty {
			// 如果victims不为空,则设置一个 10ms后的触发器
			tickc = time.After(10 * time.Millisecond) 
		}

		select {
		case <-tickc:
		case <-s.victimc:
		case <-s.stopc:
			return
		}
	}
}

如何推送历史事件

notify事件只会处理在synced watcher中的事件,如果watcher还有事件没有推送完成(也就是unsynced watcher)此时直接推送最新事件无法保证event推送的先后顺序,因此需要一个机制来保证unsynced watcher能够顺利的推送完成。syncWatchersLoop协程就负责处理unsynced中的 watcher,也就是历史事件推送。

具体流程是syncWatchersLoop每 100ms同步一次unsynced map 中的 watcher,先将这些watcher中的历史event事件推送到sendLoop,推送完成后就可以移动 synced watcher中,这样下次notify就可以直接处理最新事件。

// syncWatchersLoop 每100毫秒同步一次 unsynced map 中的 watcher
func (s *watchableStore) syncWatchersLoop() {
	defer s.wg.Done()

	// 创建一个每100毫秒触发一次的定时器
	waitDuration := 100 * time.Millisecond
	delayTicker := time.NewTicker(waitDuration)
	defer delayTicker.Stop()

	for {
		s.mu.RLock()
		st := time.Now()
		lastUnsyncedWatchers := s.unsynced.size()
		s.mu.RUnlock()

		unsyncedWatchers := 0
		if lastUnsyncedWatchers > 0 {
			// 如果 unsynced map 不为空,则同步 watcher,具体同步逻辑在syncWatchers中
			unsyncedWatchers = s.syncWatchers()
		}
		syncDuration := time.Since(st)

		delayTicker.Reset(waitDuration)
		// 如果还有未同步的 watcher,并且未同步的 watcher 的数量有所减少
		if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
			delayTicker.Reset(syncDuration)
		}

		select {
		case <-delayTicker.C:
		case <-s.stopc:
			return
		}
	}
}

有一个问题是,如何获取 watcher的历史版本呢?学习 MVCC机制可以知道 boltdb中存储了所有的历史版本,那么直接遍历 boltdb就可以得到 key对应的历史版本,在syncWatchers中有一个优化是每次批量选择一批watcher进行同步。

// kvsToEvents将KeyValue转为 Event事件
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
	for i, v := range vals {
		var kv mvccpb.KeyValue
		if err := kv.Unmarshal(v); err != nil {
			lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
		}

		if !wg.contains(string(kv.Key)) {
			continue
		}

		ty := mvccpb.PUT
		if isTombstone(revs[i]) {
			ty = mvccpb.DELETE
			kv.ModRevision = BytesToRev(revs[i]).Main
		}
		evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
	}
	return evs
}

如何匹配变更事件

最后一个问题是watcher如何根据写请求快速匹配到对应的 watcher,在上文已经有所涉及,这里再总结一下。

最简单的事件匹配方法是一个个遍历 watcher匹配对应的 key,但这样复杂度是 O(N),当 watcher成千上万时这样的复杂度是不可接受的,更何况 etcd会在每个写事件结束后同步触发流程。

整体推送流程架构图可以看到每个watcher都由对应的 map和 intervalTree组成,etcd通过这两个数据结构实现匹配机制,这里 map是基础数据结构,go的 map用哈希表实现,平均O(1)可以匹配到单个 Key事件。不过 etcd不仅支持单 key 监听,还支持区间、前缀监听,watcher使用 map匹配单 key的变更事件。如果是区间事件则需要使用到intervalTree,称为区间树(有点类似算法竞赛中常见的线段树,在线段树的基础上加上lazy标记也可以实现区间修改查询,但这里的区间树其实是个二叉平衡树),可以快速找到与 key相交的区间,复杂度 O(logn)。区间树底层是一个红黑树,支持区间修改、查询、合并等操作,相比于线段树更适合区间相关操作,并且红黑树能够自平衡,保证最坏情况下的插入查询。

https://github.com/etcd-io/etcd/blob/main/pkg/adt/interval_tree.go

etcd事务机制

背景

事务将应用程序的多个读写操作合并成一个逻辑操作单元,即事务中的操作要么成功提交,要么失败回滚,即使失败也可以在应用层进行重试。几乎所有的关系型都会宣传支持事务处理,然而当非关系型(NoSQL,etcd属于 NoSQL的一种)数据库兴起后,事务逐渐开始被放弃,或者替换成了弱的多的保证。非关系型数据库主要通过复制、分区的方式来提升系统的可扩展性和可用性,有更灵活的数据格式,而且一般是分布式设计。一方面,实现事务意味着降低性能,如果是跨 IDC部署,那么就得用 2PC等方式实现分布式事务,实现难度变得很大;另一方面,很多场景下事务又是不可或缺的,比如支付转账的场景,对异常极其敏感,不使用事务实现会需要在应用层做更多的兜底逻辑。

事务有其优势和保证,也有其局限性,分析事务的保证和各种问题可以更好帮助权衡实际系统中的事务设计。好在 etcd支持事务,而且在应用层只用了 391 行(etcd v3.4.33)就实现了一个极简的stm(Software Transactional Memory)事务框架,得以从源码层面分析一种事务的实现方式。

etcd事务使用

etcd基于 MVCC机制实现了事务,许多使用了 MVCC的数据库都不会暴露底层 MVCC信息,比如 mysql的事务操作基于 MVCC实现,基于START TRANSACTION、COMMIT、ROLLBACK就可以实现事务的创建、提交、回滚操作,这个过程感受不到 MVCC的存在。而 etcd中版本号信息是公开的,这给了很大的灵活性,比如官方的 clientV3就提供了concurrency库,封装了几种场景的分布式场景操作:

  • mutex.go:一个分布式锁实现
  • election.go:一个分布式选举的实现,依靠分布式锁和租约实现选举,当节点想成为 Leader时会尝试获取锁,当Leader租约过期时其他节点就可以竞争锁。
  • stm.go:客户端实现的事务框架,支持 ACID事务并且能够指定不同隔离级别,最强大的是代码极其精简。

事务 api
etcd v3引入了微事务(mini-transaction)的概念,允许用户在一次修改中批量执行多次操作,这意味着这一组操作被绑定成原子操作并共享同一个mod_revision(key最后一次修改的revision),同时基于 MVCC版本号可以实现各种隔离机制。

client.Txn(ctx).If(cmp1, cmp2, ...).Then(op1, op2, ...,).Else(op1’, op2’,).commit()
message TxnRequest {
  repeated Compare compare = 1;  // Compare列表,包含一个谓词的合取(所有谓词都只能为真),所有谓词为真时才能执行事务
  repeated RequestOp success = 2;  // RequestOp列表,compare为真时要执行的操作
  repeated RequestOp failure = 3;  // RequestOp列表,compare为假时要执行的操作
}

事务 API由 If语句、Then语句、Else语句组成,如果 If语句为真则执行 Then语句的操作,否则执行 Else语句的操作,这有点像 CAS操作。If语句对应TxnRequest中的compare,Then和 Else则分别对应 success、failure。If操作可以比较一个键的当前值或者版本号和预期值与语气版本号是否一致,success/failure对应 etcd中要执行的操作,比如 put、delete、range。这里重点讨论下 If操作检查项。

If操作和版本号
在这里插入图片描述

If操作对应一个Compare Message列表:

message Compare {
  enum CompareResult {
    EQUAL = 0;
    GREATER = 1;
    LESS = 2;
    NOT_EQUAL = 3;
  }
  enum CompareTarget {
    VERSION = 0;
    CREATE = 1;
    MOD = 2;
    VALUE= 3;
  }
  CompareResult result = 1;
  // target is the key-value field to inspect for the comparison.
  CompareTarget target = 2;
  // key is the subject key for the comparison operation.
  bytes key = 3;
  oneof target_union {
    int64 version = 4;
    int64 create_revision = 5;
    int64 mod_revision = 6;
    bytes value = 7;
  }
}

从其 proto定义可以清晰的看出来If语句支持的检查操作包括=、>、<、≠,支持的检查项包括mod_revision、create_revision、version、value,这里value代表值,其他三个是版本号,在 MVCC中有介绍过,很容易混淆,这里结合事务背景再重提一遍:

  • mod_revision: key最近一次修改的版本号,用于检查最近一次修改是否符合预期。比如Tom查询账户余额此时大于 100¥,此时的mod_revision是v1,然后再进行转账操作,此时要保证着 100¥还在,那么就要保证转账时候的mod(“Tom”)=v1。
  • create_revision: key的创建版本号,用于检查 key是否还存在,key不存在时create_revision就是 0。
  • version: key的修改次数,用于检验修改次数是否符合预期。

ACID:事务的安全性保证

事务提供的保证也即 ACID,分别代表原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)。实现事务的系统都声称兼容 ACID,但其实现方式不尽相同,围绕隔离性也有许多模糊的定义。不符合 ACID的系统有时候被称为 BASE:(基本可用:Basically Available, 软状态:Soft state, 最终一致性:Eventually consistent),听起来更加模棱两可。BASE唯一能确定的是它不是 ACID,此外没有承诺任何东西 3

原子性和持久性

在多线程编程语义中,原子性代表一个线程执行的原子操作,其他线程看不到其中间状态,只能处于操作前和操作后状态。在事务中,原子性指在一个事务中的操作要么全部成功要么全部失败,发生错误时中止事务,并且可以安全的将部分写入丢弃,应用层保证没有发生数据更改,可以安全重试。从这个角度来看,原子性叫做可终止性更为准确。

ACID的原子性不关心多个线程并发操作一个数据会发生什么,多线程的并发安全性是由隔离性来提供。

持久性则是代表事务提交后会永久保存在非易失的存储设备比如磁盘,对于etcd这种复制数据库,还得保证数据复制到大多数节点并且持久化,etcd通过 Raft将 WAL日志复制到其他节点,最终状态机将数据持久化在BoltDB中。

一致性

在讨论 Raft算法的时候已经对各种一致性进行过讨论,Raft的一致性属于CAP中的一致性,而 ACID的一致性(Consistency)指的是更改前后数据库保证恒等状态。这个定义听起来非常模糊,查阅资料的时候也觉得这个一致性其实并没有什么用,比如一种恒等状态是要求转账前后总账户不能为负数,这个限制其实应该是应用层的限制,就算数据库实现完美,应用层实现有 BUG也会导致不一致,任何产品都保证不了实现没有 BUG。参阅了《DDIA》3 后才算解开了疑问,很有意思:

i. Joe Hellerstein has remarked that the C in ACID was “tossed in to make the acronym work” in Härder and Reuter’s paper [7], and that it wasn’t considered important at the time.

总之,一致性应该是应用层的保证,其实并不应该属于数据库的 “ACID”保证。

隔离性

在这里插入图片描述

前文原子性针对的是单对象,单对象的操作要么成功要么失败,而隔离性针对多对象,要保证多对象并发操作的安全性。隔离性有不同隔离级别,分别提供了不同的安全性保证。最强的隔离级别称为“可串行化”,事务虽然并发执行,但看起来像一个一个串行执行,数据库要保证提交时的结果和一个一个串行执行完全相同。由于性能原因实际场景中很少用到串行化,实际上用的都是“快照隔离”,通过快照实现串行化的效果。

隔离级别分为以下几类:

  • 未提交读(Read Uncommitted): 能读取其他事务未提交数据,会导致脏读
  • 已提交读(Read Committed): 只能读到已经提交的事务,其他事务一经提交,当前事务就可以读到,会导致不可重复读问题
  • 可重复读(Repeated Read): 一个事务中,同一个读操作任何时刻都只能读到同样的结果
  • 串行化(Serializable): 最高的隔离级别,实现串行化有两种方法:悲观锁方法,直接加读写锁实现事务的串行提交,这种方法牺牲了并发性能太差一般不会使用;另一种方法是快照隔离方法,基于 MVCC机制,每次读取都是当前的版本号快照数据,通过版本号快照的机制方式实现事务隔离,提交的时候检测是否发生冲突,冲突则失败重试,相当于乐观锁机制。

etcd STM机制

概念
为了保证事务安全,要么通过悲观锁避免冲突,要么使用乐观锁检测冲突并重试。etcd实现了 MVCC和mini-transaction,很容易想到通过这个方式实现乐观锁。不过 etcd对这部分逻辑进行了封装,抽象出了一个公共的事务处理框架,也就是 STM机制。

STM,全称Software Transactional Memory,即软件事务内存,允许多个线程同时对内存进行读写操作,不需要使用锁这类的同步机制。有两个要点,“乐观”和“内存”,乐观指的是执行过程假定没有冲突,提交时检测冲突,内存指的是一次事务包含一系列内存操作,这些操作要么成功要么失败。

使用
先看看 etcd给的示例程序里面是如何使用 stm的,如下的ExampleSTM_apply函数,描述了一个并发转账场景,在exchange中实现了 STM需要的操作代码,fromK代表转账账户,toK代表收款账户,随机从fromK中提取一些值转给toK;接下来开启 10 个协程并发执行,最后

// ExampleSTM_apply shows how to use STM with a transactional
// transfer between balances.
func ExampleSTM_apply() {
	...
	exchange := func(stm concurrency.STM) error {
		// 读取需要修改的值
		from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
		if from == to {
			// nothing to do
			return nil
		}
		// 读取原始值
		fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
		fromV, toV := stm.Get(fromK), stm.Get(toK)
		fromInt, toInt := 0, 0
		fmt.Sscanf(fromV, "%d", &fromInt)
		fmt.Sscanf(toV, "%d", &toInt)

		// 修改
		xfer := fromInt / 2
		fromInt, toInt = fromInt-xfer, toInt+xfer

		// 写入
		stm.Put(fromK, fmt.Sprintf("%d", fromInt))
		stm.Put(toK, fmt.Sprintf("%d", toInt))
		return nil
	}

	// 10 个线程并发执行,模拟真实场景
	var wg sync.WaitGroup
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
				log.Fatal(serr)
			}
		}()
	}
	wg.Wait()
	
	// 校验账户总和是否符合预期
	sum := 0
	accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
	if err != nil {
		log.Fatal(err)
	}
	for _, kv := range accts.Kvs {
		v := 0
		fmt.Sscanf(string(kv.Value), "%d", &v)
		sum += v
	}

}

参考


  1. 滴滴升级事故 ↩︎

  2. In Search of an Understandable Consensus Algorithm ↩︎

  3. 《Designing Data-Intensive Application》 ↩︎ ↩︎ ↩︎

  4. 后分布式时代: 多数派读写的’少数派’实现 ↩︎

  5. etcd-raft的线性一致读方法一:ReadIndex ↩︎

  6. https://etcd.io/docs/v3.5/learning/ ↩︎

  7. Etcd存储的实现 ↩︎

  8. etcd 实战课 ↩︎

  9. etcd学习(2)-etcd中watch源码解读 ↩︎

Logo

更多推荐