Zookeeper从单机到集群分布式及源码分析


场景

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

Zookeeper可以理解为一个文件系统+一个通知机制

特点

- 是一个leader和多个follower来组成的集群
-  集群中只要有半数以上的节点存活,Zookeeper就能正常工作
- 全局数据一致性,每台服务器都保存一份相同的数据副本,无论client连接哪台server,数据都是一致的
-  数据更新原子性,一次数据要么成功,要么失败
-  实时性,在一定时间范围内,client能读取到最新数据

数据结构

在这里插入图片描述

- Zookeeper的文件结构看起来像一棵树,和Linux的文件结构类似
- 每一个节点上可以存储1MB的容量,可以看成k-v的键值对形式

负载均衡

Zookeeper会记录每一台服务器的访问数量,争取做到雨露均沾

选举机制

  • Zookeeper只要超过半数以上的服务器正常启动着,整个集群就会存活,因此推荐集群中搭建的服务器数量是单数
  • 整个集群中会有一个Leader服务器,通过选举投票产生,获得半数以上的票则该服务器为Leader

在这里插入图片描述

1. 五台服务器,每台服务器只有一票,从左到右启动,每一台服务器会把票第一时间投给自己
2. 第一台服务器把票投给自己得到的票数不超过半数以上,第二台也是一样
3. 第一台和第二台互相告知,第一台发现第二台的id比自己大,于是把自己的票转移给第二台
4. 第二台得到两票后,仍不满足,继续等待其他服务器启动
5. 第三台服务器把票投给自己发现不满足leader条件,但第二台发现id没有第三台大,于是把票都给了第三台
6. 第三台拿到三票后已经超过了半数投票,成为集群中的Leader
7. 第四台和第五台尽管自己的id比第三台大,但是第三台已经成为Leader,则无可奈何只能选择成为Follwer
8. 如果第三台服务器挂掉,则重新选举,谁的id大谁就最有可能成为Leader
9. 影响成为Leader的三个因素:id、启动顺序、服务器的数量

节点类型

  • 持久性
    • 持久性节点:客户端与Zookeeper断开连接后,由此客户端创建的节点依然存在
      • 持久性带编号节点:客户端与Zookeeper断开连接后,不仅由此客户端创建的节点依然存在,创建的节点还会编排好编号类似于mysql的auto_increment
  • 短暂性
    • 短暂性节点:客户端与Zookeeper断开连接后,由此客户端创建的节点删除
      • 短暂性带编号节点:客户端与Zookeeper断开连接后,不仅由此客户端创建的节点删除,创建的节点还会编排好编号类似于mysql的auto_increment

监听原理

在这里插入图片描述

- Zookeeper的客户端启动后会创建两个线程,一个用来做数据的通信传输,一个用来监听某节点变化
- Zookeeper可以通过命令addWatch选择监听的节点
- 如果节点改变则会告知客户端中用于监听节点的线程
- 此线程无论是节点的数据还是节点的路径发生变化都会被告知
- 被告知后线程会在内部调用process方法

事务操作流程

在这里插入图片描述

- 如果客户端直接找的Leader服务器,则Leader服务器广播给每个Follwer
- 每个Follwer需要确定写入之前的数据和Leader一致,如果一致则告知Leader
- Leader如果收到一半以上的告知(包括Leader本身),则会同意本次写入操作
- Leader自身写入后会告知客户端,并通知其余Follwer
- 其余Follwer通过Leader保持数据的一致性

在这里插入图片描述

- 如果客户端找到是Follwer服务器,则此Follwer会先告知Leader
- Leader服务器广播给每个Follwer需要进行写入操作
- 每个Follwer需要确定写入之前的数据和Leader一致,如果一致则告知Leader
  • Leader如果收到一半以上的告知(包括Leader本身),则会同意本次写入操作,并告知与客户端联系的Follwer
  • 与客户端联系的Follwer会将消息告诉客户端,此时Leader会通知其余Follwer
  • 其余Follwer通过Leader保持数据的一致性

安装及配置

  1. 解压

    tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
    
  2. 重命名

    mv apache-zookeeper-3.6.0-bin zookeeper
    
  3. 在/opt/zookeeper/这个目录上创建zkData和zkLog目录

    mkdir zkData
    mkdir zkLog
    
  4. 进入/opt/zookeeper/conf这个路径,复制一份 zoo_sample.cfg 文件并命名为 zoo.cfg

    cd conf
    cp zoo_sample.cfg zoo.cfg
    
  5. 编辑zoo.cfg文件,修改dataDir路径:

    dataDir=/opt/zookeeper/zkData
    dataLogDir=/opt/zookeeper/zkLog
    
  6. zoo.cfg配置文件中的参数详解

    tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
    
    initLimit =10:LF初始通信时限,集群中的Follower跟随者服务器与Leader领导者服务器之间,启动时能容忍的最多心跳数 10 * tickTime(10个心跳时间)如果领导和跟随者没有发出心跳通信,就视为失效的连接,领导和跟随者彻底断开。
    
    syncLimit =5:LF同步通信时限,集群启动后,Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime->10秒,Leader就认为Follwer已经死掉,会将Follwer从服务器列表中删除。
    
    dataDir:数据文件目录+数据持久化路径,主要用于保存Zookeeper中的数据。
    
    dataLogDir:日志文件目录。
    
    clientPort =2181:客户端连接端口,监听客户端连接的端口。
    

常用命令

  1. 启动Zookeeper

    ./zkServer.sh start
    
  2. 查看进程是否启动(QuorumPeerMain:是zookeeper集群的启动入口类,是用来加载配置启动QuorumPeer线程的)

    jps
    
  3. 查看状态

    ./zkServer.sh status
    
  4. 启动客户端

    ./zkCli.sh
    
  5. 退出客户端

     quit
    
  6. 查看根节点下的一级节点

    ls /
    
  7. 查看节点的详细数据

    老版本:ls2 /
    新版本:ls -s /
    
    • cZxid:创建节点的事务
      每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
      事务ID是ZooKeeper中所有修改总的次序。
      每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
    • ctime:被创建的毫秒数(从1970年开始)
    • mZxid:最后更新的事务zxid
    • mtime:最后修改的毫秒数(从1970年开始)
    • pZxid:最后更新的子节点zxid
    • cversion:创建版本号,子节点修改次数
    • dataVersion:数据变化版本号
    • aclVersion:权限版本号
    • ephemeralOwner:如果是临时节点,这个是znode拥有者的session id。如果不是临时节点
      则是0。
    • dataLength:数据长度
    • numChildren:子节点数
  8. 创建节点

    创建节点:create /xx
    创建节点并赋值:create /xx xxx
    创建多级节点:create /xx/xx
    创建短暂节点:create -e /xx
    创建排序节点:create -s /xx
    创建短暂并排序节点:create -e -s /xx
    
  9. 查看节点的值

    get /xx
    
  10. 修改节点的值

    set /xx xxx
    
  11. 删除节点

    删除节点:delete /xx
    递归删除存在子节点的节点:deleteall /xx
    
  12. 监听节点变化

    addwatch /xxx
    如果被监听的节点修改内容后则:WatchedEvent state:SyncConnected type:NodeDataChanged path:/xxx
    如果被监听的节点创建子节点后则:WatchedEvent state:SyncConnected type:NodeCreated path:/xxx/yyy
    

集群部署

​ 假设有三台服务器,集群部署Zookeeper(先搞定一台,再克隆两台)

  1. 在/zkData目录下创建myid文件

    vim myid
    1(编号,其余两台机器分别写2和3)
    
  2. 配置zoo.cfg文件

    vim zoo.cfg
    #######################cluster##########################
    server.1=192.168.126.134:2888:3888
    server.2=192.168.126.137:2888:3888
    server.3=192.168.126.131:2888:3888
    
    • 解读配置的内容 server.A=B:C:D
      • A:一个数字,表示第几号服务器
        集群模式下配置的/opt/zookeeper/zkData/myid文件里面的数据就是A的值
      • B:服务器的ip地址
      • C:与集群中Leader服务器交换信息的端口
      • D:选举时专用端口,万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选
        出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口
  3. 克隆其余两台服务器
    - 在虚拟机数据目录vms下,创建zk02
    - 将本台服务器数据目录下的.vmx文件和所有的.vmdk文件分别拷贝zk02下
    - 虚拟机->文件->打开 (选择zk02下的.vmx文件)
    - 开启此虚拟机,弹出对话框,选择“我已复制该虚拟机”
    - 进入系统后,修改linux中的ip,修改/opt/zookeeper/zkData/myid中的数值为2
    - 第三台服务器重复如上步骤

  4. 关闭每台机器的防火墙

临时关闭
systemctl stop firewalld.service
  1. 启动第一台并检查状态
./zkServer.sh start
./zkServer.sh status

得到结果:
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Error contacting service. It is probably not running.

因为没有超过半数以上的服务器,所以集群失败 (防火墙没有关闭也会导致失败)
  1. 当启动第2台服务器时
查看第1台的状态:Mode: follower
查看第2台的状态:Mode: leader

关联Java

  1. 创建Maven工程

  2. 添加pom文件

    <dependencies>
    	<dependency>
    		<groupId>org.apache.logging.log4j</groupId>
    		<artifactId>log4j-core</artifactId>
    		<version>2.8.2</version>
    	</dependency>
    	<dependency>
    		<groupId>org.apache.zookeeper</groupId>
    		<artifactId>zookeeper</artifactId>
    		<version>3.6.0</version>
    	</dependency>
    	<dependency>
    		<groupId>junit</groupId>
    		<artifactId>junit</artifactId>
    		<version>4.12</version>
    </dependency>
    </dependencies>
    
    1. 在resources下创建log4j.properties
    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/zk.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
    
    1. 创建Zookeeper客户端
    private String connStr = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181";
        /**
         * session超时 60秒:一定不能太少,因为连接zookeeper和加载集群环境会因为性能原因延迟略高
         * 如果时间太少,还没有创建好客户端,就开始操作节点,会报错的
         */
        private int sessionTimeout = 10 * 1000;
    
        private ZooKeeper zkCli;
    
        @Before
        public void init() throws Exception {
            zkCli = new ZooKeeper(connStr, sessionTimeout, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("监听到了");
                    System.out.println(watchedEvent.getType());
                }
            });
        }
    
        /**
         * 创建节点
         * @throws Exception
         */
        @Test
        public void create() throws Exception{
            String value = "zookeeperStudy";
            byte[] bytes = value.getBytes();
            String nodeCreated = zkCli.create("/lagou", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("nodeCreated = " + nodeCreated);
        }
    
        @Test
        public void get() throws Exception{
            byte[] data = zkCli.getData("/lagou", false, new Stat());
            String str = new String(data);
            System.out.println("str = " + str);
        }
    
        @Test
        public void set() throws Exception{
            Stat lagou = zkCli.setData("/lagou", "zookeeperA".getBytes(), 0);
            System.out.println("lagou = " + lagou);
        }
    
        @Test
        public void delete() throws Exception{
            zkCli.delete("/test", 1);
            System.out.println("删除成功");
        }
    
        @Test
        public void childRen() throws Exception{
            List<String> children = zkCli.getChildren("/", true);
            for (String child : children) {
                System.out.println("child = " + child);
            }
            //顺便监听子节点变化
            System.in.read();
        }
    
        @Test
        public void exist() throws Exception{
            Stat exists = zkCli.exists("/lagouafds", true);
            System.out.println(exists == null ? "不存在" : "存在");
        }
    

商家消费者模型

需求

- 模拟美团服务平台,商家营业通知,商家打烊通知

商家类

/**
 * @author 张哲
 */
public class ShopServer {
    private String connectString = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181";
    private static int sessionTimeout = 10 * 1000;
    private ZooKeeper zk = null;

    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            public void process(WatchedEvent event) {

            }
        });
    }

    // 注册到集群
    public void register(String ShopName) throws Exception {
        // 一定是"EPHEMERAL_SEQUENTIAL短暂有序型"的节点,才能给shop编号,shop1. shop2...”
        String create = zk.create("/meituan/Shop", ShopName.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("【" + ShopName + "】 开始营业! " + create);
    }

    // 业务功能
    public void business(String ShopName) throws Exception {
        System.out.println("【"+ShopName+"】 正在营业中 ...");
        System.in.read();
    }

    public static void main(String[] args) throws Exception {
        ShopServer shop = new ShopServer();
        // 1.连接zookeeper集群(和美团取得联系)
        shop.getConnect();
        // 2.将服务器节点注册(入住美团)
        shop.register(args[0]);
        // 3.业务逻辑处理(做生意)
        shop.business(args[0]);

    }
}

消费者

/**
 * @author 张哲
 */
public class Customers {
    private String connectString = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181";
    private static int sessionTimeout = 10 * 1000;
    private ZooKeeper zk = null;

    // 创建到zk的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            public void process(WatchedEvent event) {
                // 再次获取所有商家
                try {
                getShopList();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    // 获取服务器列表信息
    public void getShopList() throws Exception {
    // 1获取服务器子节点信息,并且对父节点进行监听
        List<String> shops = zk.getChildren("/meituan", true);
    // 2存储服务器信息列表
        ArrayList<String> shoplist = new ArrayList();
    // 3遍历所有节点,获取节点中的主机名称信息
        for (String shop : shops) {
            System.out.println(zk);
            byte[] data = zk.getData("/meituan/" + shop, false, new Stat());
            shoplist.add(new String(data));
        }
        // 4打印服务器列表信息
        System.out.println(shoplist);
    }
    // 业务功能
    public void business() throws Exception {
        System.out.println("客户正在浏览商家 ...");
        System.in.read();
    }

    public static void main(String[] args) throws Exception{
        Customers client = new Customers();
        client.getConnect();
        // 2.获取/meituan的子节点信息,从中获取服务器信息列表(从美团中获取商家列表)
        client.getShopList();
        // 3.业务进程启动 (对比商家,点餐)
        client.business();

    }

}

运行流程

  • IDEA中配置args参数
    在这里插入图片描述

  • 首先在linux中添加一个商家,然后观察客户端的控制台输出(商家列表会立刻更新出最新商
    家),多添加几个,也会实时输出商家列表

    create /meituan/KFC "KFC"
    create /meituan/BKC "BurgerKing"
    create /meituan/baozi "baozi"
    
  • 在linux中删除商家,在客户端的控制台也会实时看到商家移除后的最新商家列表

delete /meituan/baozi

分布式锁(核心代码)

非锁写法:

	@Autowired
	private ProductService productService;
	@GetMapping("/product/reduce")
	@ResponseBody
	public Object reduceStock(int id) throws Exception{
		    productService.reduceStock(id);
	    return "ok";
	}

加锁写法:

	@Autowired
    private ProductService productService;
    private static String connectString = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181";

    @GetMapping("/product/reduce")
    @ResponseBody
    public Object reduceStock(int id) throws Exception{
        // 重试策略 (1000毫秒试1次,最多试3次)
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //1.创建curator工具对象
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
        client.start();
        //2.根据工具对象创建“内部互斥锁”
        InterProcessMutex lock = new InterProcessMutex(client, "/product_"+id);
        try {
            //3.加锁
            lock.acquire();
            productService.reduceStock(id);
        }catch(Exception e){
            if(e instanceof RuntimeException){
                throw e;
            }
        }finally{
            //4.释放锁
            lock.release();
        }
        return "ok";
    }
Logo

更多推荐