一、CAP理论

二、基于跳表实现的轻量级键值型存储引擎

1、跳表节点Node的完整实现

//Class template to implement node
template<typename K, typename V> 
class Node {

public:
    
    Node() {} 
    Node(K k, V v, int); 
    ~Node();
    K get_key() const;
    V get_value() const;
    void set_value(V);
    // Linear array to hold pointers to next node of different level
    Node<K, V> **forward;
    int node_level;

private:
    K key;
    V value;
};

template<typename K, typename V> 
Node<K, V>::Node(const K k, const V v, int level) {
    this->key = k;
    this->value = v;
    this->node_level = level; 

    // level + 1, because array index is from 0 - level
    this->forward = new Node<K, V>*[level+1];
    
	// Fill forward array with 0(NULL) 
    memset(this->forward, 0, sizeof(Node<K, V>*)*(level+1));
};

template<typename K, typename V> 
Node<K, V>::~Node() {
    delete []forward;
};

template<typename K, typename V> 
K Node<K, V>::get_key() const {
    return key;
};

template<typename K, typename V> 
V Node<K, V>::get_value() const {
    return value;
};
template<typename K, typename V> 
void Node<K, V>::set_value(V value) {
    this->value=value;
};

关键:this->forward = new Node<K,V>*[level+1];在堆上分配指针数组,数组长度为 level+1,每个元素都是 Node<K,V>*;forward[i] 保存 节点在第 i 层的下一个节点,节点出现在多层,每层的 forward 指向不同,forward 数组就是跳表多层索引结构的核心。

2、跳表构造函数

// construct skip list
template<typename K, typename V> 
SkipList<K, V>::SkipList(int max_level) {

    this->_max_level = max_level;
    this->_skip_list_level = 0;
    this->_element_count = 0;

    // create header node and initialize key and value to null
    K k;
    V v;
    this->_header = new Node<K, V>(k, v, _max_level);
};
  • _max_level → 保存跳表最大层数

  • _skip_list_level → 当前跳表的最高层数(初始为空表,所以是 0)

  • _element_count → 当前跳表元素数量(初始 0)

根据跳表最大层数创建了跳表的虚拟头节点。

3、创建节点

// create new node 
template<typename K, typename V>
Node<K, V>* SkipList<K, V>::create_node(const K k, const V v, int level) {
    Node<K, V> *n = new Node<K, V>(k, v, level);
    return n;
}

总结作用:

 creat_node只是一个 封装的工厂函数,功能是:

  1. 在堆上创建一个新的跳表节点

  2. 初始化节点数据(key、value、level、forward)

  3. 返回节点指针给跳表,用于插入操作

4、插入节点函数

// Insert given key and value in skip list 
// return 1 means element exists  
// return 0 means insert successfully
/* 
                           +------------+
                           |  insert 50 |
                           +------------+
level 4     +-->1+                                                      100
                 |
                 |                      insert +----+
level 3         1+-------->10+---------------> | 50 |          70       100
                                               |    |
                                               |    |
level 2         1          10         30       | 50 |          70       100
                                               |    |
                                               |    |
level 1         1    4     10         30       | 50 |          70       100
                                               |    |
                                               |    |
level 0         1    4   9 10         30   40  | 50 |  60      70       100
                                               +----+

*/
template<typename K, typename V>
int SkipList<K, V>::insert_element(const K key, const V value) {
    
    mtx.lock();
    Node<K, V> *current = this->_header;

    // create update array and initialize it 
    // update is array which put node that the node->forward[i] should be operated later
    Node<K, V> *update[_max_level+1];
    memset(update, 0, sizeof(Node<K, V>*)*(_max_level+1));  

    // start form highest level of skip list 
    for(int i = _skip_list_level; i >= 0; i--) {
        while(current->forward[i] != NULL && current->forward[i]->get_key() < key) {
            current = current->forward[i]; 
        }
        update[i] = current;
    }

    // reached level 0 and forward pointer to right node, which is desired to insert key.
    current = current->forward[0];

    // if current node have key equal to searched key, we get it
    if (current != NULL && current->get_key() == key) {
        std::cout << "key: " << key << ", exists" << std::endl;
        mtx.unlock();
        return 1;
    }

    // if current is NULL that means we have reached to end of the level 
    // if current's key is not equal to key that means we have to insert node between update[0] and current node 
    if (current == NULL || current->get_key() != key ) {
        
        // Generate a random level for node
        int random_level = get_random_level();

        // If random level is greater thar skip list's current level, initialize update value with pointer to header
        if (random_level > _skip_list_level) {
            for (int i = _skip_list_level+1; i < random_level+1; i++) {
                update[i] = _header;
            }
            _skip_list_level = random_level;
        }

        // create new node with random level generated 
        Node<K, V>* inserted_node = create_node(key, value, random_level);
        
        // insert node 
        for (int i = 0; i <= random_level; i++) {
            inserted_node->forward[i] = update[i]->forward[i];
            update[i]->forward[i] = inserted_node;
        }
        std::cout << "Successfully inserted key:" << key << ", value:" << value << std::endl;
        _element_count ++;
    }
    mtx.unlock();
    return 0;
}

总结插入流程:

  1. 从最高层查找每层前驱节点(update 数组)

  2. 到底层 Level 0,检查 key 是否存在

  3. 不存在 → 随机生成新节点层数

  4. 创建新节点 → 更新 forward 指针插入到每一层

  5. 更新跳表最高层和元素计数

5、持久化与数据恢复功能

// Dump data in memory to file 
template<typename K, typename V> 
void SkipList<K, V>::dump_file() {

    std::cout << "dump_file-----------------" << std::endl;
    _file_writer.open(STORE_FILE);
    Node<K, V> *node = this->_header->forward[0]; 

    while (node != NULL) {
        _file_writer << node->get_key() << ":" << node->get_value() << "\n";
        std::cout << node->get_key() << ":" << node->get_value() << ";\n";
        node = node->forward[0];
    }

    _file_writer.flush();
    _file_writer.close();
    return ;
}

// Load data from disk
template<typename K, typename V> 
void SkipList<K, V>::load_file() {

    _file_reader.open(STORE_FILE);
    std::cout << "load_file-----------------" << std::endl;
    std::string line;
    std::string* key = new std::string();
    std::string* value = new std::string();
    while (getline(_file_reader, line)) {
        get_key_value_from_string(line, key, value);
        if (key->empty() || value->empty()) {
            continue;
        }
        // Define key as int type
        insert_element(stoi(*key), *value);
        std::cout << "key:" << *key << "value:" << *value << std::endl;
    }
    delete key;
    delete value;
    _file_reader.close();
}

// Get current SkipList size
template<typename K, typename V> 
int SkipList<K, V>::size() { 
    return _element_count;
}

template<typename K, typename V>
void SkipList<K, V>::get_key_value_from_string(const std::string& str, std::string* key, std::string* value) {

    if(!is_valid_string(str)) {
        return;
    }
    *key = str.substr(0, str.find(delimiter));
    *value = str.substr(str.find(delimiter)+1, str.length());
}

template<typename K, typename V>
bool SkipList<K, V>::is_valid_string(const std::string& str) {

    if (str.empty()) {
        return false;
    }
    if (str.find(delimiter) == std::string::npos) {
        return false;
    }
    return true;
}

6、删除节点

// Delete element from skip list 
template<typename K, typename V> 
void SkipList<K, V>::delete_element(K key) {

    mtx.lock();
    Node<K, V> *current = this->_header; 
    Node<K, V> *update[_max_level+1];
    memset(update, 0, sizeof(Node<K, V>*)*(_max_level+1));

    // start from highest level of skip list
    for (int i = _skip_list_level; i >= 0; i--) {
        while (current->forward[i] !=NULL && current->forward[i]->get_key() < key) {
            current = current->forward[i];
        }
        update[i] = current;
    }

    current = current->forward[0];
    if (current != NULL && current->get_key() == key) {
       
        // start for lowest level and delete the current node of each level
        for (int i = 0; i <= _skip_list_level; i++) {

            // if at level i, next node is not target node, break the loop.
            if (update[i]->forward[i] != current) 
                break;

            update[i]->forward[i] = current->forward[i];
        }

        // Remove levels which have no elements
        while (_skip_list_level > 0 && _header->forward[_skip_list_level] == 0) {
            _skip_list_level --; 
        }

        std::cout << "Successfully deleted key "<< key << std::endl;
        delete current;
        _element_count --;
    }
    mtx.unlock();
    return;
}
  • 功能:从跳表中删除指定 key 的节点

  • 包含三件事:

    1. 找到要删除的节点

    2. 修改跳表 forward 指针,将节点从链表中断开

    3. 释放节点内存,并更新跳表状态

7、查找节点

// Search for element in skip list 
/*
                           +------------+
                           |  select 60 |
                           +------------+
level 4     +-->1+                                                      100
                 |
                 |
level 3         1+-------->10+------------------>50+           70       100
                                                   |
                                                   |
level 2         1          10         30         50|           70       100
                                                   |
                                                   |
level 1         1    4     10         30         50|           70       100
                                                   |
                                                   |
level 0         1    4   9 10         30   40    50+-->60      70       100
*/
template<typename K, typename V> 
bool SkipList<K, V>::search_element(K key) {

    std::cout << "search_element-----------------" << std::endl;
    Node<K, V> *current = _header;

    // start from highest level of skip list
    for (int i = _skip_list_level; i >= 0; i--) {
        while (current->forward[i] && current->forward[i]->get_key() < key) {
            current = current->forward[i];
        }
    }

    //reached level 0 and advance pointer to right node, which we search
    current = current->forward[0];

    // if current node have key equal to searched key, we get it
    if (current and current->get_key() == key) {
        std::cout << "Found key: " << key << ", value: " << current->get_value() << std::endl;
        return true;
    }

    std::cout << "Not Found Key:" << key << std::endl;
    return false;
}
  • 功能:在跳表中查找指定 key

  • 返回 true → 找到节点

  • 返回 false → 没有找到

三、raft算法的kv存储

3、主要流程

1、raft类的定义

2、启动初始化

3、竞选leader

LogEntry {
    index       // 日志索引
    term        // 任期号
    command {   // 客户端命令封装(应用层)
        clientId
        commandId
        operation  // 例如 Set(key,value)
    }
}

4、日志复制、心跳

leaderHearBeatTicker:负责查看是否该发送心跳了,如果该发起就执行doHeartBeat。
doHeartBeat:实际发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendAppendentries处理rpc及其响应。
sendAppendEntries:负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
leaderSendSnapShot:负责发送快照的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
AppendEntries:接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。
InstallSnapshot:接收leader发来的快照请求,同步快照到本机。

leaderHearBeatTicker

void Raft::leaderHearBeatTicker() {
  while (true) {
    while (m_status != Leader) {
      usleep(1000 * HeartBeatTimeout);
    }

   auto suitableSleepTime{};
   auto wakeTime{};
    {
      std::lock_guard<std::mutex> lock(m_mtx);
      wakeTime = now();
      suitableSleepTime = std::chrono::milliseconds(HeartBeatTimeout) + m_lastResetHearBeatTime - wakeTime;
    }

    if (std::chrono::duration<double, std::milli>(suitableSleepTime).count() > 1) {
      usleep(std::chrono::duration_cast<std::chrono::microseconds>(suitableSleepTime).count());
    }

    if (std::chrono::duration<double, std::milli>(m_lastResetHearBeatTime - wakeTime).count() > 0) {
      //睡眠的这段时间有重置定时器,没有超时,再次睡眠
      continue;
    }
    
    doHeartBeat();
  }
}

基本逻辑与选举超时定时器相似,只不过一个是随机获取超时时间,一个固定获取心跳超时时间。

doHeartBeat

实际发送心跳,判断是Leader则构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其响应。

void Raft::doHeartBeat() {
  std::lock_guard<std::mutex> g(m_mtx);

  if (m_status == Leader) {
    auto appendNums = std::make_shared<int>(1);  //正确返回的节点的数量

    for (int i = 0; i < m_peers.size(); i++) {
      if (i == m_me) {
        continue;
      }
      
      //构造发送值
      int preLogIndex = -1;
      int PrevLogTerm = -1;
      getPrevLogInfo(i, &preLogIndex, &PrevLogTerm);
      std::shared_ptr<raftRpcProctoc::AppendEntriesArgs> appendEntriesArgs =
          std::make_shared<raftRpcProctoc::AppendEntriesArgs>();
      appendEntriesArgs->set_term(m_currentTerm);
      appendEntriesArgs->set_leaderid(m_me);
      appendEntriesArgs->set_prevlogindex(preLogIndex);
      appendEntriesArgs->set_prevlogterm(PrevLogTerm);
      appendEntriesArgs->clear_entries();
      appendEntriesArgs->set_leadercommit(m_commitIndex);
      if (preLogIndex != m_lastSnapshotIncludeIndex) {
        for (int j = getSlicesIndexFromLogIndex(preLogIndex) + 1; j < m_logs.size(); ++j) {
          raftRpcProctoc::LogEntry* sendEntryPtr = appendEntriesArgs->add_entries();
          *sendEntryPtr = m_logs[j];  //=是可以点进去的,可以点进去看下protobuf如何重写这个的
        }
      } else {
        for (const auto& item : m_logs) {
          raftRpcProctoc::LogEntry* sendEntryPtr = appendEntriesArgs->add_entries();
          *sendEntryPtr = item;  //=是可以点进去的,可以点进去看下protobuf如何重写这个的
        }
      }
      int lastLogIndex = getLastLogIndex();
      // leader对每个节点发送的日志长短不一,但是都保证从prevIndex发送直到最后
      myAssert(appendEntriesArgs->prevlogindex() + appendEntriesArgs->entries_size() == lastLogIndex,
               format("appendEntriesArgs.PrevLogIndex{%d}+len(appendEntriesArgs.Entries){%d} != lastLogIndex{%d}",
                      appendEntriesArgs->prevlogindex(), appendEntriesArgs->entries_size(), lastLogIndex));
      //构造返回值
      const std::shared_ptr<raftRpcProctoc::AppendEntriesReply> appendEntriesReply =
          std::make_shared<raftRpcProctoc::AppendEntriesReply>();
      appendEntriesReply->set_appstate(Disconnected);

      std::thread t(&Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,
                    appendNums);  // 创建新线程并执行b函数,并传递参数
      t.detach();
    }
    m_lastResetHearBeatTime = now();  // leader发送心跳,就不是随机时间了
  }
}

sendAppendEntries

负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。

bool
Raft::sendAppendEntries(int server, std::shared_ptr<mprrpc::AppendEntriesArgs> args, std::shared_ptr<mprrpc::AppendEntriesReply> reply,
                        std::shared_ptr<int> appendNums) {

    // todo: paper中5.3节第一段末尾提到,如果append失败应该不断的retries ,直到这个log成功的被store

    bool ok = m_peers[server]->AppendEntries(args.get(), reply.get());

    if (!ok) {
        return ok;
    }

    lock_guard<mutex> lg1(m_mtx);

    //对reply进行处理
    // 对于rpc通信,无论什么时候都要检查term
    if(reply->term() > m_currentTerm){
        m_status = Follower;
        m_currentTerm = reply->term();
        m_votedFor = -1;
        return ok;
    } else if (reply->term() < m_currentTerm) {//正常不会发生
        return ok;
    }

    if (m_status != Leader) { //如果不是leader,那么就不要对返回的情况进行处理了
        return ok;
    }
    //term相等

    if (!reply->success()){
        //日志不匹配,正常来说就是index要往前-1,既然能到这里,第一个日志(idnex = 1)发送后肯定是匹配的,因此不用考虑变成负数
        //因为真正的环境不会知道是服务器宕机还是发生网络分区了
        if (reply->updatenextindex()  != -100) {  //-100只是一个特殊标记而已,没有太具体的含义,这里表示任期落后了
            // 优化日志匹配,让follower决定到底应该下一次从哪一个开始尝试发送
            m_nextIndex[server] = reply->updatenextindex();  
        }
        //	如果感觉rf.nextIndex数组是冗余的,看下论文fig2,其实不是冗余的
    } else {
        *appendNums = *appendNums +1;   //到这里代表同意接收了本次心跳或者日志
        
        m_matchIndex[server] = std::max(m_matchIndex[server],args->prevlogindex()+args->entries_size()   );  //同意了日志,就更新对应的m_matchIndex和m_nextIndex
        m_nextIndex[server] = m_matchIndex[server]+1;
        int lastLogIndex = getLastLogIndex();

        if (*appendNums >= 1 + m_peers.size()/2) { //可以commit了
            //两种方法保证幂等性,1.赋值为0 	2.上面≥改为==

            *appendNums = 0;  //置0

            //日志的安全性保证!!!!! leader只有在当前term有日志提交的时候才更新commitIndex,因为raft无法保证之前term的Index是否提交
            //只有当前term有日志提交,之前term的log才可以被提交,只有这样才能保证“领导人完备性{当选领导人的节点拥有之前被提交的所有log,当然也可能有一些没有被提交的}”
            //说白了就是只有当前term有日志提交才会提交
            if(args->entries_size() >0 && args->entries(args->entries_size()-1).logterm() == m_currentTerm){
            
                m_commitIndex = std::max(m_commitIndex,args->prevlogindex() + args->entries_size());
            }

        }
    }
    return ok;
}

m_nextIndex[server] = reply->updatenextindex(); 中涉及日志寻找匹配加速的优化
对于leader只有在当前term有日志提交的时候才更新commitIndex这个安全性保证,详情看参考公众号文章的:7.6是否一项提议只需要被多数派通过就可以提交?

AppendEntries

接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。

void Raft::AppendEntries1(const mprrpc:: AppendEntriesArgs *args,  mprrpc::AppendEntriesReply *reply) {
    std::lock_guard<std::mutex> locker(m_mtx);

//	不同的人收到AppendEntries的反应是不同的,要注意无论什么时候收到rpc请求和响应都要检查term


    if (args->term() < m_currentTerm) {
        reply->set_success(false);
        reply->set_term(m_currentTerm);
        reply->set_updatenextindex(-100); // 论文中:让领导人可以及时更新自己
         DPrintf("[func-AppendEntries-rf{%d}] 拒绝了 因为Leader{%d}的term{%v}< rf{%d}.term{%d}\n", m_me, args->leaderid(),args->term() , m_me, m_currentTerm) ;
        return; // 注意从过期的领导人收到消息不要重设定时器
    }
    Defer ec1([this]() -> void { this->persist(); });//由于这个局部变量创建在锁之后,因此执行persist的时候应该也是拿到锁的.    //本质上就是使用raii的思想让persist()函数执行完之后再执行
    if (args->term() > m_currentTerm) {
        // 三变 ,防止遗漏,无论什么时候都是三变

        m_status = Follower;
        m_currentTerm = args->term();
        m_votedFor = -1; // 这里设置成-1有意义,如果突然宕机然后上线理论上是可以投票的
        // 这里可不返回,应该改成让改节点尝试接收日志
        // 如果是领导人和candidate突然转到Follower好像也不用其他操作
        // 如果本来就是Follower,那么其term变化,相当于“不言自明”的换了追随的对象,因为原来的leader的term更小,是不会再接收其消息了
    }

    // 如果发生网络分区,那么candidate可能会收到同一个term的leader的消息,要转变为Follower,为了和上面,因此直接写
    m_status = Follower; // 这里是有必要的,因为如果candidate收到同一个term的leader的AE,需要变成follower
    // term相等
    m_lastResetElectionTime = now();    //重置选举超时定时器

    // 不能无脑的从prevlogIndex开始阶段日志,因为rpc可能会延迟,导致发过来的log是很久之前的

    //	那么就比较日志,日志有3种情况
    if (args->prevlogindex() > getLastLogIndex()) { //追随者的日志比领导者的要短。这种情况,追随者需要从领导者那里接收缺失的日志条目。
        reply->set_success(false);
        reply->set_term(m_currentTerm);
        reply->set_updatenextindex(getLastLogIndex() + 1);
        return;
    } else if (args->prevlogindex() < m_lastSnapshotIncludeIndex) { // 如果prevlogIndex还没有更上快照
    	//追随者可能已经通过快照机制截断了其日志 或 追随者接受了一个快照,丢弃了快照索引之前的日志 或 领导者的日志落后
        reply->set_success(false);
        reply->set_term(m_currentTerm);
        reply->set_updatenextindex(m_lastSnapshotIncludeIndex + 1); //不会浪费时间重试发送追随者已经用快照截断的日志条目
    }
    //	本机日志有那么长,冲突(same index,different term),截断日志
    // 注意:这里目前当args.PrevLogIndex == rf.lastSnapshotIncludeIndex与不等的时候要分开考虑,可以看看能不能优化这块
    if (matchLog(args->prevlogindex(), args->prevlogterm())) {
        //日志匹配,那么就复制日志
        for (int i = 0; i < args->entries_size(); i++) {
            auto log = args->entries(i);
            if (log.logindex() > getLastLogIndex()) { //超过就直接添加日志
                m_logs.push_back(log);
            } else {  //没超过就比较是否匹配,不匹配再更新,而不是直接截断  检查当前日志条目是否已经存在于追随者的日志中。
				//判断追随者日志中相应索引位置的日志条目的任期是否与请求中的日志条目的任期相同。如果任期不同,说明日志不匹配。
				//参考前面公众号文章 4.1 写 case3
                //follower日志超前
                if (m_logs[getSlicesIndexFromLogIndex(log.logindex())].logterm() != log.logterm()) { //不匹配就更新
                    m_logs[getSlicesIndexFromLogIndex(log.logindex())] = log;
                }
            }
        }

        if (args->leadercommit() > m_commitIndex) {
            m_commitIndex = std::min(args->leadercommit(), getLastLogIndex());// 这个地方不能无脑跟上getLastLogIndex(),因为可能存在args->leadercommit()落后于 getLastLogIndex()的情况
        }


        // 领导会一次发送完所有的日志
        reply->set_success(true);
        reply->set_term(m_currentTerm);


        return;
    } else {
        // 不匹配,不匹配不是一个一个往前,而是有优化加速
        // PrevLogIndex 长度合适,但是不匹配,因此往前寻找 矛盾的term的第一个元素
        // 为什么该term的日志都是矛盾的呢?也不一定都是矛盾的,只是这么优化减少rpc而已
        // ?什么时候term会矛盾呢?很多情况,比如leader接收了日志之后马上就崩溃等等
        reply->set_updatenextindex(args->prevlogindex());

        for (int index = args->prevlogindex(); index >= m_lastSnapshotIncludeIndex; --index) {
            //整个term回退
            if (getLogTermFromLogIndex(index) != getLogTermFromLogIndex(args->prevlogindex())) {
                reply->set_updatenextindex(index + 1);
                break;
            }
        }
        reply->set_success(false);
        reply->set_term(m_currentTerm);

        return;
    }

}

4、持久化

持久化就是把不能丢失的数据保存到磁盘。

1.持久化介绍
持久化的内容为两部分:
1.raft节点的部分信息;2.kvDb的快照

1.raft节点的部分信息
m_currentTerm :当前节点的Term,避免重复到一个Term,可能会遇到重复投票等问题。
m_votedFor :当前Term给谁投过票,避免故障后重复投票。
m_logs :raft节点保存的全部的日志信息。

不妨想一想,其他的信息为什么不用持久化,比如说:身份、commitIndex、applyIndex等等。
applyIndex不持久化是经典raft的实现,在一些工业实现上可能会优化,从而持久化。
即applyIndex不持久化不会影响“共识”。

2.kvDb的快照
m_lastSnapshotIncludeIndex :快照的信息,快照最新包含哪个日志Index
m_lastSnapshotIncludeTerm :快照的信息,快照最新包含哪个日志Term,与m_lastSnapshotIncludeIndex 是对应的。

Snapshot是kvDb的快照,也可以看成是日志,因此:全部的日志 = m_logs + snapshot

因为Snapshot是kvDB生成的,kvDB肯定不知道raft的存在,而什么term、什么日志Index都是raft才有的概念,因此snapshot中肯定没有term和index信息。所以需要raft自己来保存这些信息。
故,快照与m_logs联合起来理解即可。

2.为什么要持久化这些内容
两部分原因:共识安全、优化。
除了snapshot相关的部分,其他部分都是为了共识安全。
而snapshot是因为日志一个一个的叠加,会导致最后的存储非常大,因此使用snapshot来压缩日志。

为什么snashot可以压缩日志?
日志是追加写的,对于一个变量的重复修改可能会重复保存,理论上对一个变量的反复修改会导致日志不断增大。
而snapshot是原地写,即只保存一个变量最后的值,自然所需要的空间就小了。

3.什么时候持久化
需要持久化的内容发送改变的时候就要注意持久化。
比如term 增加,日志增加等等。
*具体查看代码中的void Raft::persist() 相关内容

4.谁来调用持久化
谁来调用都可以,只要能保证需要持久化的内容能正确持久化。
代码中选择的是raft类自己来完成持久化。因为raft类最方便感知自己的term之类的信息有没有变化。
注意,虽然持久化很耗时,但是持久化这些内容的时候不要放开锁,以防其他线程改变了这些值,导致其它异常。

5.具体怎么实现持久化/使用哪个函数持久化
其实持久化是一个非常难的事情,因为持久化需要考虑:速度、大小、二进制安全。
因此代码中目前采用的是使用boost库中的持久化实现,将需要持久化的数据序列化转成std::string 类型再写入磁盘。
当然其他的序列化方式也少可行的,可以看到这一块还是有优化空间的。

Raft 日志 + 快照流程

  1. 写入日志(所有日志持久化)

    • Leader 写入的每条日志(未提交或已提交)都会持久化到磁盘

    • 保证崩溃重启后日志不丢失

  2. 日志提交(应用到状态机)

    • 一旦大多数节点确认日志,Leader 可以认为该条日志已提交

    • 提交日志会应用到状态机

  3. 生成快照(日志压缩)

    • 已提交日志对应的状态机状态 做快照

    • 持久化快照 + 元信息(lastIncludedIndexlastIncludedTerm

  4. 截断旧日志

    • 快照包含的日志可以删除

    • 只保留快照之后的未提交日志(继续持久化)

换句话说:快照生成后,磁盘上不再保存快照之前的日志条目,只保留快照和快照之后的日志

5、KvServer

KVServer 写请求完整流程(Put/Append)

  1. 客户端发起请求

    • PutAppend

    • 带上 key/value、操作类型、clientIdrequestId

  2. KVServer 接收请求

    • 调用 PutAppend(args, reply)

    • 封装成 Raft 命令(包含 clientId、requestId、操作)

    • 调用 m_raftNode->Start(command)

  3. Raft Start 函数处理

    • 将命令封装成 LogEntry(含 index、term、command)

    • 添加到 m_logs 日志数组中

    • 返回该条日志的 index 给 KVServer(用于追踪提交情况)

  4. 日志复制(doHeartbeat / AppendEntries)

    • Leader 定期通过 心跳 / AppendEntries RPC 将新日志条目发送给 followers

    • followers 接收日志并存入自己的日志数组

    • 大多数节点复制成功 → Leader 更新 commitIndex

  5. 日志提交到状态机(applierTicker)

    • applierTicker 后台线程/协程循环执行

    • 检查 lastApplied < commitIndex

    • 将可以提交的日志封装成 ApplyMsg 放入 applyChan 队列

    • KVServer 从 applyChan 取出日志 → 执行状态机(更新 KV 数据)

  6. 客户端得到执行结果

    • KVServer 检查 applyChan 是否有对应 index 的日志

    • 如果有且 clientId + requestId 匹配 → 命令执行成功 → reply->set_err(OK)

    • 如果超时或日志不匹配 → reply->set_err(ErrWrongLeader) → 客户端重试

1、kvserver介绍

图中是raftServer,这里叫成kvServer,是一样的。
kvServer其实是个中间组件,负责沟通kvDB和raft节点。

那么外部请求是Server来负责,加入后变成了:

 2.kvServer怎么和上层kvDB沟通,怎么和下层raft节点沟通

std::shared_ptr<LockQueue<ApplyMsg> > applyChan; //kvServer和raft节点的通信管道    
std::unordered_map<std::string, std::string> m_kvDB; //kvDB,用unordered_map来替代

kvDB:使用的是unordered_map来代替上层的kvDB,因此没啥好说的。
raft节点:其中LockQueue 是一个并发安全的队列,这种方式其实是模仿的go中的channel机制。
在raft类中这里可以看到,raft类中也拥有一个applyChan,kvSever和raft类都持有同一个applyChan,来完成相互的通信

3.kvServer怎么处理外部请求


从上面的结构图中可以看到kvServer负责与外部clerk通信。
那么一个外部请求的处理可以简单的看成:

接收外部请求。
本机内部与raft和kvDB协商如何处理该请求。
返回外部响应。

4.接收与响应外部请求

对于1和3,请求和返回的操作我们可以通过http、自定义协议等等方式实现,但是既然我们已经写出了rpc通信的一个简单的实现,就使用rpc来实现
而且rpc可以直接完成请求和响应这一步,后面就不用考虑外部通信的问题了,好好处理好本机的流程即可。
相关函数是:

void PutAppend(google::protobuf::RpcController *controller,
                   const ::raftKVRpcProctoc::PutAppendArgs *request,
                   ::raftKVRpcProctoc::PutAppendReply *response,
                   ::google::protobuf::Closure *done) override;

void Get(google::protobuf::RpcController *controller,
             const ::raftKVRpcProctoc::GetArgs *request,
             ::raftKVRpcProctoc::GetReply *response,
             ::google::protobuf::Closure *done) override;

见名知意,请求分成两种:get和put(也就是set)。
如果是putAppend,clerk中就调用PutAppend 的rpc。
如果是Get,clerk中就调用Get 的rpc。

5.与raft节点沟通

线性一致性
参考: 线性一致性和 Raft
一个系统的执行历史是一系列的客户端请求,或许这是来自多个客户端的多个请求。如果执行历史整体可以按照一个顺序排列,且排列顺序与客户端请求的实际时间相符合,那么它是线性一致的。当一个客户端发出一个请求,得到一个响应,之后另一个客户端发出了一个请求,也得到了响应,那么这两个请求之间是有顺序的,因为一个在另一个完成之后才开始。一个线性一致的执行历史中的操作是非并发的,也就是时间上不重合的客户端请求与实际执行时间匹配。并且,每一个读操作都看到的是最近一次写入的值。
一个稍微通俗一点的理解为:
如果一个操作在另一个操作开始前就结束了,那么这个操作必须在执行历史中出现在另一个操作前面。

要理解这个你需要首先明白:
对于一个操作来说,从请求发出到收到回复,是一个时间段。因为操作中包含很多步骤,至少包含:网络传输、数据处理、数据真正写入数据库、数据处理、网络传输。
那么操作真正完成(数据真正写入数据库)可以看成是一个时间点。
操作真正完成 可能在操作时间段的任何一个时间点完成。

raft如何做的
每个 client 都需要一个唯一的标识符,它的每个不同命令需要有一个顺序递增的 commandId,clientId 和这个 commandId,clientId 可以唯一确定一个不同的命令,从而使得各个 raft 节点可以记录保存各命令是否已应用以及应用以后的结果。
即对于每个client,都有一个唯一标识,对于每个client,只执行递增的命令。

在保证线性一致性的情况下如何写kv
具体的思想在上面已经讲过,这里展示一下关键的代码实现:

	if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {//通过超时pop来限定命令执行时间,如果超过时间还没拿到消息说明命令执行超时了。

        if (ifRequestDuplicate(op.ClientId, op.RequestId)) {
            reply->set_err(OK);// 超时了,但因为是重复的请求,返回ok,实际上就算没有超时,在真正执行的时候也要判断是否重复
        } else {
            reply->set_err(ErrWrongLeader);   ///这里返回这个的目的让clerk重新尝试
        }
    } else {
        //没超时,命令可能真正的在raft集群执行成功了。
        if (raftCommitOp.ClientId == op.ClientId &&
            raftCommitOp.RequestId == op.RequestId) {   //可能发生leader的变更导致日志被覆盖,因此必须检查
            reply->set_err(OK);
        } else {
            reply->set_err(ErrWrongLeader);
        }
    }

  在保证线性一致性的情况下如何读kv

if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {
    int _ = -1;
    bool isLeader = false;
    m_raftNode->GetState(&_, &isLeader);

    if (ifRequestDuplicate(op.ClientId, op.RequestId) && isLeader) {
        //如果超时,代表raft集群不保证已经commitIndex该日志,但是如果是已经提交过的get请求,是可以再执行的。
        // 不会违反线性一致性
        std::string value;
        bool exist = false;
        ExecuteGetOpOnKVDB(op, &value, &exist);
        if (exist) {
            reply->set_err(OK);
            reply->set_value(value);
        } else {
            reply->set_err(ErrNoKey);
            reply->set_value("");
        }
    } else {
        reply->set_err(ErrWrongLeader);  //返回这个,其实就是让clerk换一个节点重试
    }
 } else {
    //raft已经提交了该command(op),可以正式开始执行了
    //todo 这里感觉不用检验,因为leader只要正确的提交了,那么这些肯定是符合的
    if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) {
        std::string value;
        bool exist = false;
        ExecuteGetOpOnKVDB(op, &value, &exist);
        if (exist) {
            reply->set_err(OK);
            reply->set_value(value);
        } else {
            reply->set_err(ErrNoKey);
            reply->set_value("");
        }
    } else {

    }
}

Logo

更多推荐