目录

一、前言

二、分布式协调服务Zookeeper

1、什么是Zookeeper

2、Zookeeper的核心机制

2.1文件系统

2.1.1 创建节点操作命令 

2.2通知机制

2.2.1监听器Watcher

2.2.2 Watcher 设置

2.2.3 Watcher 触发流程

2.2.4 注意事项

心跳检测:

三、分布式系统的问题

四、封装zookeeper的客户端类

五、节点注册到zookeeper

六、一键编译脚本

七、总结


一、前言

在前边的文章中我们已经基本实现了一个rpc远程调用框架,并且给它也设计了日志系统的模块,现在我们还差最后一步,那就是分布式绕不开的一个东西--分布式协调服务

那我们的rpc远程调用框架为什么需要用到分布式协调服务呢?到现在为止还有什么不足之处呢?

我们现在实现的是一个简单的rpc远程调用服务,我们只实现了rpc调用端调用服务端一台服务器上某一个方法的情况,但实际上,在rpc服务端也许存在着多台机器,如果调用端上想调用哪种某种方法,它实际上是并不知道该方法在哪一台机器上运行着,所以我们就需要一个分布式服务配置中心,即在这个分布式网络系统上的所有提供远程调用的rpc节点都要向该配置中心注册它们的服务,当调用者想去调用某个服务的时候它就会先去配置中心查找一下但是分布式协调服务的功能远不止这些,如分布式锁,在多个rpc请求的情况下限制只有一个rpc请求去访问共享资源(方法)

二、分布式协调服务Zookeeper

我们先来简单认识一下什么是分布式协调服务,分布式协调服务有很多,常见的有ZooKeeper、Etcd、Consul、Nacos 等,本项目中用的是zookeeper,所以我们着重来认识一下Zookeeper。

1、什么是Zookeeper

ZooKeeper 是一个开源的分布式协调服务,是Google的Chubby一个开源的实现,它提供了一种可靠的机制来管理大量的分布式节点。它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。简单来说,ZooKeeper 就像是一个分布式系统的“大脑”或“管理员”,帮助管理和协调分布在多个服务器上的应用程序和服务

2、Zookeeper的核心机制

2.1文件系统

ZooKeeper 的数据模型被设计成一个层次化的命名空间,类似于文件系统的目录树结构。这个命名空间由一系列被称为 znode(ZooKeeper Node)的节点组成,与文件系统不同的是,每个 znode 可以包含关联的数据,并且可以有子节点。每个znode节点在Zookeeper的命名空间中都有一个唯一的路径,类似于文件系统中的路径。

如下

Zookeeper为了保证高吞吐和低延迟,在内存中维护了这个树状的目录结构,这种特性使得Zookeeper不能用于存放大量的数据,每个节点的存放数据上限为1M。 

Zookeeper中的znode节点有以下几种类型:

  • 永久节点(Persistent znode):永久节点在创建后将一直存在于Zookeeper中,直到被显示删除。它们用于存储持久化数据,并且能够拥有子节点。
  • 临时节点(Ephemeral znode):临时节点在创建它的客户端断开连接后将自动被删除。它们用于临时性的数据存储,例如客户端的临时状态或会话信息。
  • 顺序节点(Sequential znode):顺序节点在创建后会自动分配一个唯一的递增编号。这些编号被用于在有序的集合中进行排序,以便在分布式环境中实现有序操作。(本项目中用不到)
  • 容器节点 (Container Nodes):是一种特殊类型的持久节点,它允许在其子节点全部被删除后自动删除自身。用于当某个“容器”内不再有任何活动时,自动清理这个容器。

  • TTL 节点 (Time-To-Live Nodes):是一种带有生存时间(TTL, Time-To-Live)属性的临时节点。这意味着如果在指定的时间段内没有客户端与之交互(例如读取或写入),该节点将会被自动删除。

znode节点的其他特性:

  • 数据(Data):每个znode节点可以存储一些数据,它们以字节数组的形式存在。
  • 版本号(Version):每个znode节点都有一个与之关联的版本号,用于实现乐观并发控制。
  • 访问控制列表(ACL):Zookeeper使用ACL来控制对znode节点的访问权限。
  • 监听(Watcher):Zookeeper允许客户端对特定的znode节点注册监听器,以便在节点发生变化
2.1.1 创建节点操作命令 
create -c 节点路径          //创建容器节点
create  节点路径            //像这样默认创建出来的就是永久节点
create 节点路径 数据        //创建节点且保存数据
get 节点路径                //获取节点中的数据
ctreate -s 节点路径         //创建永久序号节点
create -e 节点路径          //创建临时节点,为什么是-e?因为取得是ephemeral的首字母

//要创建一个 TTL 节点,首先需要为 ZooKeeper 设置 authProvider 和 quota 等
//相关配置来支持 TTL 功能。然后可以通过以下命令创建:
create -t <ttl_in_milliseconds> /ttl_node "data"

ls 节点路径                  //查询节点下的信息
ls -r 节点路径                //查询节点下的所有节点信息

get -s 节点路径                //查看Znode节点详细信息(stat元数据)
delete 节点路径                //删除一层节点
deleteall 目标节点路径          //想要删除的节点还有子节点

set 节点路径 数据                  //添加/修改数据命令
get -w 节点路径                   //以watch机制监听节点

需要注意的是 创建和获取数据,节点前都必须加上路径符号/,否则就会报错

2.2通知机制

2.2.1监听器Watcher

ZooKeeper 的通知机制是其核心特性之一,它允许客户端监听(Watch)特定 znode 的变化,并在这些变化发生时得到通知。这种机制对于实现分布式系统中的事件驱动架构非常有用,比如服务发现、配置管理等场景。

  • Watcher:一个监听器,可以附加到读操作(如 getDatagetChildrenexists)上。当被监听的 znode 发生变化时,ZooKeeper 会触发这个监听器,并向客户端发送通知。
  • 一次性监听:默认情况下,ZooKeeper 的 Watcher 是一次性的。一旦某个事件触发了 Watcher,该 Watcher 就会被移除。如果需要持续监听,必须在接收到通知后重新设置监听器。

Watcher 可以针对以下几种类型的事件进行监听:

  • 节点数据变更:znode 的数据发生变化。
  • 节点删除:znode 被删除。
  • 子节点列表变更 :znode 的子节点列表发生变化。
  • 节点创建 :znode 被创建。
  • 会话状态变化:与客户端会话相关的事件,如会话过期等。
2.2.2 Watcher 设置

exists(path,watcher)

  • 检查指定路径的 znode 是否存在。
  • 如果设置了 watcher,则在 znode 被创建、删除或数据改变时触发。

getData(path,watcher)

  • 获取指定路径 znode 的数据。
  • 如果设置了 watcher,则在 znode 数据改变或被删除时触发。

getChildren(path,watcher)

  • 获取指定路径 znode 的所有直接子节点列表。
  • 如果设置了 watcher,则在子节点列表发生变化时触发。
2.2.3 Watcher 触发流程
  1. 设置监听器:客户端调用上述方法之一并传递一个 watcher 对象。
  2. 等待事件发生:ZooKeeper 监听 znode 的状态变化。
  3. 事件发生:当某个事件触发时,ZooKeeper 向客户端发送通知。
  4. 处理通知:客户端接收到通知后执行相应的回调函数。
  5. 重置监听器:由于 Watcher 是一次性的,通常需要在处理完通知后重新设置监听器。
2.2.4 注意事项
  • 一次性监听:如前所述,Watcher 是一次性的。这意味着每当事件触发后,你需要重新设置监听器才能继续监听未来的事件。
  • 网络问题:如果客户端与 ZooKeeper 服务器之间的连接中断,可能会丢失一些事件通知。为此,ZooKeeper 提供了会话恢复机制,但你仍需考虑如何处理潜在的通知丢失情况。
  • 性能影响:虽然 Watcher 非常有用,但过度使用可能会影响性能。每个 Watcher 都会在 ZooKeeper 服务器上占用一定的资源,因此应谨慎设计监听策略。
  • Znode节点只存储简单的byte字节数组,如果存储对象,我们需要自己转换对象生成字节数组。
心跳检测:

Zookeeper 的心跳检测机制是其分布式协调服务的重要组成部分,主要用于确保客户端与服务器之间以及集群节点之间的连接正常,并且在发生故障时能够及时发现并进行恢复。

  1. 客户端到服务器的心跳:当一个客户端连接到 Zookeeper 集群中的某个服务器时,它会开启一个会话。在这个会话中,客户端会定期发送 PING 请求(也可以理解为心跳包)给服务器。这个PING请求不仅用于维持会话的活跃状态,还用于让客户端确认它所连接的服务器是否仍然存活。如果服务器在指定时间内没有收到客户端的心跳请求,或者客户端没有收到服务器的响应,则认为该连接断开。

  2. 服务器到客户端的心跳:从服务器到客户端的心跳主要体现在服务器对客户端请求的响应上。每次服务器接收到客户端的请求后都会返回相应的响应。虽然这不是传统意义上的“心跳”,但它的作用和意义是一样的,即确认双方的连接状态。

三、分布式系统的问题

  • 服务的动态注册和发现,当我们的框架中没有这个服务配置中心的时候,为了支持高并发,OrderService被部署了4份,每个客户端都保存了一份服务提供者的列表,但这个列表是静态的(在配置文件中写死的),如果服务的提供者发生了变化,例如有些机器down了,或者又新增了OrderService的实例,客户端根本不知道,想要得到最新的服务提供者的URL列表,必须手工更新配置文件,很不方便。

问题:客户端和服务提供者的紧耦合

解决方案: 解除耦合,增加一个中间层 -- 注册中心它保存了能提供的服务的名称,以及URL。首先这些服务会在注册中心进行注册,当客户端来查询的时候,只需要给出名称,注册中心就会给出一个URL。所有的客户端在访问服务前,都需要向这个注册中心进行询问,以获得最新的地址。

注册中心可以是树形结构,每个服务下面有若干节点,每个节点表示服务的实例。

注册中心和各个服务实例(rpc服务方法所在的机器)直接建立Session,要求实例们定期发送心跳,一旦特定时间收不到心跳,则认为实例挂了,删除该实例。 

上面就是分布式服务在本项目的应用,下面我们就来实现它。

四、封装zookeeper的客户端类

        我们这个项目用到的zookeeper的API接口不多,所以我们这里先对本项目需要用到的zookeeper的API接口进行封装以方便使用。

        首先我们明确要干什么,通过上面的对zookeeper的了解,我们要知道在项目中哪里用到zookeeper,在我们的框架中,对于rpc服务的提供端,在rpc节点启动之前,要将它上面的所发布服务向zookeeper上注册,服务(service_name)作为永久性节点,方法(method_name)作为临时性节点,在临时性节点上存储的就是这个服务及方法的IP和Port,当调用发想要调用对应的服务及方法的时候只需要在zookeeper配置中心上查找就行了。

//zookeeperutil.h
#pragma once
#include <semaphore.h> //信号量头文件
#include <zookeeper/zookeeper.h>
#include <string>

// 封装zk的客户端类
class zkClient
{
public:
    zkClient();
    ~zkClient();
    // zkClient的启动连接zkServer
    void Start();
    // 在zkserver上根据指定的path创建znode节点
    // 节点的路径、数据、数据长度、节点的分类永久or临时
    void Create(const char *path, const char *data, int datalen, int state = 0);
    // 根据参数指定的znode节点路径,获取节点的值
    std::string GetData(const char *path);

private:
    //zk的客户端句柄,根据这个句柄就可以操作zkServer
    //是 ZooKeeper C API 中的客户端句柄,用于与 ZooKeeper 服务进行交互。
    //可以把它理解为“连接到 ZooKeeper 服务器的通道”或“客户端连接标识”。
    //当建立连接成功后就会生成一个句柄,里面记录了各种的信息
    zhandle_t *m_zhandle;
};

实现 

//zookeeperutil.cc
#include "zookeeperutil.h"
#include "mprpcapplication.h"
#include <semaphore.h>
#include <iostream>


// 全局的watcher观察器 zkserver给client的通知
void globle_watcher(zhandle_t *zh, int type,
                    int state, const char *path,
                    void *watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) // 回调消息类型是和会话相关的消息类型
    {
        if (state == ZOO_CONNECTED_STATE) // zkclient和zkserver连接成功
        {
            sem_t *sem = (sem_t *)zoo_get_context(zh); // 从指定的句柄上获取信号量
            sem_post(sem);                             // 信号量资源加一
        }
    }
}
zkClient::zkClient()
    : m_zhandle(nullptr)
{}
zkClient::~zkClient()
{
    if (m_zhandle != nullptr)
    {
        zookeeper_close(m_zhandle); // 关闭句柄,释放锁资源
    }
}
void zkClient::Start()
{
    std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
    std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport"); 
    
    std::string connstr = host + ":" + port; // 给定了固定的格式 host:port
    std::cout << "Connecting to: " << connstr << std::endl;

    /*
    zookeeper_mt多线程版本,即它的API客户端程序提供了三个线程,是异步的额
    API调用线程
    网络i/o线程 是在zookeeper_init接口中使用pthread_create创建的线程 客户端程序不需要网络性能所以是poll模型
    watcher回调线程
    */
    // zookeeper_init是zookeeper中自带的接口,30000是会话的超时时间
   
    m_zhandle = zookeeper_init(connstr.c_str(), globle_watcher, 30000, nullptr, nullptr, 0);
    if (nullptr == m_zhandle)
    {
        std::cout << "zookeeper_init error" << std::endl;
        exit(EXIT_FAILURE);
    }
    //走到这里成功了是连接m_zhandle句柄成功了,并不是连接zk_server成功了
    sem_t sem;
    sem_init(&sem, 0, 0);
    zoo_set_context(m_zhandle, &sem); // 给指定的句柄上绑定信号量,只有当上面的watcher那里信号量加一这里才能继续向下走
    sem_wait(&sem);
    std::cout << "zookeeper_init success!" << std::endl;
}
void zkClient::Create(const char *path, const char *data, int datalen, int state)
{
    char path_buffer[128];
    int buffer_len = sizeof(path_buffer);
    int flag;
    // 同步的先判断想要创建的节点是否存在,如果在就不要重复创建
    flag = zoo_exists(m_zhandle, path, 0, nullptr);
    if (ZNONODE == flag) // 需要创建的节点不存在
    {
        flag = zoo_create(m_zhandle, path, data, datalen,
                          &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, buffer_len);
        if (flag == ZOK)
        {
            std::cout << "znode create success... path:" << path << std::endl;
        }
        else
        {
            std::cout << "flag:" << flag << std::endl;
            std::cout << "znode create error... path" << path << std::endl;
            exit(EXIT_FAILURE);
        }
    }
}

//根据指定指定节点路径获取值
std::string zkClient::GetData(const char *path)
{
    char buffer[64];
    int buffer_len = sizeof(buffer);
    int flag = zoo_get(m_zhandle, path, 0, buffer, &buffer_len, nullptr);
    if (flag != ZOK) // ZOK表示操作成功
    {
        std::cout << "Get znode error... path:" << path << std::endl;
        return "";
    }
    else
    {
        return buffer;
    }
}

 代码解析:

 m_zhandle = zookeeper_init(connstr.c_str(), globle_watcher, 30000, nullptr, nullptr, 0);

我们连接的时候用的是zookeeper提供的API:zookeeper_init()   

  • 它里面的第一个参数的格式是给定的,如下图,所以我们在最开始定义了 connstr 的格式  std::string connstr = host + ":" + port; 

  •  第二个参数是设置全局默认的 Watcher 回调函数,当 ZooKeeper 客户端与服务端之间的连接状态发生变化时(比如断开、重连等),或者某些 znode 发生变化时,这个回调会被触发。
  • 第三个参数是会话的超时时间(以毫秒为单位),客户端与 ZooKeeper 服务器之间通信的最大等待时间。如果在这个时间内没有收到心跳响应,则认为会话失效。这里是30s
  • 第四个参数用于恢复之前会话的客户端 ID。如果你想让新连接尝试恢复之前的会话(比如在崩溃后重启),你可以传入之前保存的 clientid 和对应的密码。如果传入 nullptr,则表示创建一个新的会话。
  • 第五个参数控制客户端行为的标志位。目前只支持一个标志:ZOO_READONLY,允许客户端在只读模式下连接到 ZooKeeper 集群。(项目中用不到)

我们现在使用的是zookeeper的多线程版本,在它的API客户端提供了三个线程

  1. API调用线程:当我们启动 Start() 之后,会执行 zookeeper_init(),当前的这个线程就是调用线程。
  2. 网络I/O线程:在zookeeper_init()这个函数会调用pthread_create()创建了一个线程专门发起网络I/O操作,底层用的是poll(客户端程序不需要做到高并发),所以说使用zookeeper_init()   并不是在这个函数中直接发起的I/O连接,不然调用线程和网络I/O线程就是一个线程了。
  3. watcher回调线程:也是在zookeeper_init()创建的线程,当客户端接收到zk_server的响应的时候,使用这个回调线程

所以说我们在 zookeeper_init() 后面判断是否成功,不是网络连接成功,而是创建m_zhandle成功。

接着我们创建了信号量,使用 zoo_set_context(m_zhandle, &sem); 给句柄m_zhandle添加一些额外的信息(信号量),然后主线程(API调用线程)等待(因为我们初始信号量设置0),只能等zk_server响应的时候才能向下走,所以我们该实现一个全局默认的 Watcher 回调函数了,在回调函数里判断客户端是否和zk_server连接成功,然后信号量++。

zk_client想要连接zk_server的时候会发通知,zk_server收到通知处理后会给zk_client也发通知,如何发通知呢?就是通过这个回调函数

当 ZooKeeper 客户端与服务端之间的连接时,回调函数触发,表示连接所有的工作都做完,连接成功。这就是三个线程的作用。

五、节点注册到zookeeper

接下来我们就在项目中实现如何将项目中发布的rpc节点注册到zookeeper中

// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
    zkClient zkCli;
    zkCli.Start();
    for (auto &sp : m_serviceMap)
    {
        std::string service_path = "/" + sp.first;
        zkCli.Create(service_path.c_str(), nullptr, 0);
        for (auto &mp : sp.second.m_methodMap)
        {
            std::string method_path = service_path + "/" + mp.first;
            char method_path_data[128] = {0};
            sprintf(method_path_data, "%s:%d", ip.c_str(), port);
            // 方法都是临时性节点
            zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
        }
    }

然后在rpc的调用方,之前我们在还没有配置中心的时候,我们都是读取配置文件中的信息,现在我们可以直接在zookeeper上先查找想要调用服务及方法的ip和port

// std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    // uint16_t port= atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    // 之前都是使用上面的方法来获取配置文件中的参数,但是现在我们想调用什么,就去zk上查询该服务所在的host信息
    zkClient zkCli;
    zkCli.Start();
    std::string method_path = "/" + service_name + "/" + method_name;
    std::string host_data = zkCli.GetData(method_path.c_str());
    if (host_data == "")
    {
        controller->SetFailed(method_name + "is not exist!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        controller->SetFailed(method_path + "address is invalid!");
         return;
    }

六、一键编译脚本

为了方便进行编译,我们最后再完成一个一件编译化脚本 如下

#!/bin/bash

#表示如果任何一行命令返回非零状态(即失败),脚本就会立即退出。
#如果 cmake 执行失败,make 不会执行,因为用了 &&,注意pwd那里是反引号不是单引号
set -e

rm -rf `pwd`/bulid/*
cd `pwd`/bulid &&
    cmake .. &&
    make
cd ..
cp -r `pwd`/src/include `pwd`/lib
  1. #!/bin/bash:这是脚本的起始标志,告诉系统使用  /bin/bash 来执行这个脚本。
  2. set -e:表示一旦某条命令执行失败(返回非零退出码),整个脚本就会立刻终止。
  3. rm -rf `pwd`/bulid/* :删除 bulid 目录下的所有文件。
  4. 后面就是进入bulid目录再编译,&& 链接命令,确保前面的命令成功后再执行下一条;
  5. 再回到上一级目录,将include文件夹拷贝到bin目录下

运行如图所示

七、总结

上面就是我们整个项目的文件夹了,本次在项目中并没有介绍cmake编译,cmake的使用请移步博主的其他文章CMake的快速入门上手和保姆级使用介绍。最后运行一下看看最终成果

 项目到此结束,完结散花!感谢阅读!

Logo

更多推荐