前言

zookeeper的设计初衷,就是为了协调分布式服务,因此利用zookeeper来解决分布式锁的问题是一种最为简单的实现

1 原理

利用zookeeper的顺序临时节点的特性来实现

1.1 获取锁

  • 首先,在zookeeper当中创建一个父节点 /testLock
  • 当第一个客户端C1想要获取锁时,会先在父节点下创建一个临时顺序节点N1; 之后,C1会查找父节点下的所有的临时顺序节点并排序,判断自己所创建的节点N1是不是最小的(最靠前的); 如果是,则成功获得锁;
  • 这时候,如果再有一个客户端C2前来获取锁,也会在父节点下再创建一个临时顺序节点N2; 之后,C2也会查找父节点下面所有的临时顺序节点并排序,判断自己所创建的节点N2是不是最小的; 结果发现节点N2不是最小的。 于是,C2向排序仅在它前面的一个节点N1注册一个watcher,用于监听N1节点是否存在; C2抢锁失败,进入了等待监听状态;
  • 这时候,如果再有客户端C3前来获取锁,也会在父节点下再创建一个临时顺序节点N3; 之后,C3也会查找父节点下面所有的临时顺序节点并排序,判断自己所创建的节点N3是不是最小的; 结果发现节点N3不是最小的。 于是,C3向排序仅在它前面的一个节点N2注册一个watcher,用于监听N2节点是否存在; C3抢锁失败,进入了等待监听状态;
  • 最终各个节点形成一个类似于队列的模型来有序地监听并获取锁。

1.2 释放锁

客户端释放锁的两种情况:

任务执行完,客户端主动释放锁

当前获取锁的客户端C1在任务执行完成后,主动调用delete删除临时节点N1。

任务执行中,客户端崩溃被动解锁

当前获取锁的客户端C1在任务执行过程中崩溃,则会断开与zookeeper服务端的链接;因为是临时节点,所以与该客户端相关联的节点N1也会随之被自动删除;
由于C2注册有watcher一直监听着N1的存在,当N1节点被删除后,C2客户端会立刻收到通知;这时C2会再次查询父节点下面的所有节点并排序,确认自己当前的节点N2是不是最小的,如果是最小,则C2成功获取锁;
同理,之后排队等待的客户端也依次获取锁执行任务。

2 原生API实现

使用zookeeper的客户端api进行原始代码实现

2.1 zk连接工具类

public class ZKUtils {
    private static ZooKeeper zooKeeper;
    // 这里使用的zk集群,也可以使用单个,在连接后可添加一个路径作为父目录(该目录需要在测试之前手动在zk中创建)
    private static String address = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testLock";
    // 默认的watcher
    private static DefaultWatch defaultWatch = new DefaultWatch();
    // 线程锁(阻塞程序,连接成功后释放)
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() {
        try {
            zooKeeper = new ZooKeeper(address, 1000, defaultWatch);
            // 传递countDownLatch到defaultWatch
            defaultWatch.setCountDownLatch(countDownLatch);
            // 堵塞等待,成功连接后释放
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zooKeeper;
    }
}

2.2 默认监听watcher

在zookeeper客户端建立连接时使用的默认监听,实现自Watcher接口

public class DefaultWatch implements Watcher {
	// 在zk连接下进行入参初始化countDownLatch
    private CountDownLatch countDownLatch;

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    // 监听
    @Override
    public void process(WatchedEvent event) {
        System.out.println(event.toString());
        switch (event.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                // 建立连接后释放
                countDownLatch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
        }
    }
}

2.3 锁具体实现

加锁、解锁的具体操作实现,需要实现的回调接口:

  • Watcher:用于节点间的监听,监听事件类型NodeDeleted;
  • AsyncCallback.StringCallback:创建节点时的异步回调create;
  • AsyncCallback.Children2Callback:获取子节点时的异步回调getChildren;
public class LockWatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
	// 入参初始化
    private ZooKeeper zooKeeper;
    // 入参初始化,当前操作的线程名
    private String threadName;
    // 阻塞操作
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    // 创建的临时节点路径,类似于  /lock0000000001
    private String pathName = "";

    public String getPathName() {
        return pathName;
    }

    public void setPathName(String pathName) {
        this.pathName = pathName;
    }

    public String getThreadName() {
        return threadName;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // 获取锁操作tryLock
    public void tryLock() {
        // 判断:根数据==线程名,进行锁的重入(每重入一次,锁标志位加1)
        String[] str = getRootData().split("#");
        if (str.length == 2 && threadName.equals(str[0])) {
            try {
                // 获取当前加锁的次数
                int i = Integer.parseInt(str[1]);
                i++;
                // 设置根节点数据:当前线程名+“#”+锁个数
                zooKeeper.setData("/", (threadName + "#" + i).getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            // 客户端连接后创建的临时有序节点,回调AsyncCallback.StringCallback
            zooKeeper.create("/lock", (threadName + "#1").getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "tryLock");
        }

        try {
        	// 堵塞线程,节点创建成功后释放
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 获取锁的根目录数据:当前线程名+“#”+锁个数
    public String getRootData() {
        byte[] data = new byte[0];
        try {
            data = zooKeeper.getData("/", false, new Stat());
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new String(data);
    }

    //释放锁
    public void unLock() {
        try {
            // 可重入锁:解锁,每调用一次解锁操作,锁标志减一,为0删除节点
            String[] str = getRootData().split("#");
            // 获取当前加锁的次数
            int i = Integer.parseInt(str[1]);
            System.out.println(threadName + " stop "+i+" ... ");
            i--;
            if (i == 0) {
                // i=0,删除节点,并置空根节点数据
                zooKeeper.setData("/", "".getBytes(), -1);
                zooKeeper.delete(pathName, -1);
            } else {
                // i!=0,锁标志位减一
                zooKeeper.setData("/", (threadName + "#" + i).getBytes(), -1);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    // AsyncCallback.StringCallback:创建节点操作时的回调create
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
    	// 创建的节点名
        if (name != null) {
            // 创建节点的路径
            pathName = name;
            System.out.println(threadName + "===" + pathName);// Thread-0===/lock0000000001
            //获取当前锁在根目录下创建的子节点 ,回调AsyncCallback.Children2Callback
            zooKeeper.getChildren("/", false, this, "AsyncCallback.StringCallback ...");
        }
    }

    // AsyncCallback.Children2Callback:获取子节点的回调getChildren
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        // 查看在当前线程之前已经创建的子节点,存放于参数list中
        /*
        System.out.println("在当前线程:"+threadName+",之前创建成功的节点");
        for (String child:children) {
            System.out.print(child+" , "); // lock0000000066 , lock0000000065 ,
        }
        System.out.println();
        */
        // 当前节点集合list是无序的,进行排序操作
        Collections.sort(children);

        // 判断当前回调的节点是不是第一个节点,list中获取的节点不带‘/’,pathName中的节点带‘/’,需要进行截取
        int i = children.indexOf(pathName.substring(1));
        if (i == 0) {
            // 是,获取锁成功,阻塞结束
            System.out.println(threadName + " i am coming ...");
            try {
                // 将自己的线程名设置给根目录,并设置一个锁的标志位,根据判断该值进行锁的重入操作
                zooKeeper.setData("/", (threadName + "#1").getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
        } else {
            // 否,watcher监控当前回调节点的前一个节点的删除,回调AsyncCallback.StatCallback
            zooKeeper.exists("/" + children.get(i - 1), this, this, "监控当前节点的前一个节点");
        }
    }

    // AsyncCallback.StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // 监控失效后的再处理回调。。。
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {
        // 监听所发生的事件类型
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 监听节点删除事件,删除后再次获取子节点,触发之后节点获取锁的操作
                zooKeeper.getChildren("/", false, this, "AsyncCallback.StringCallback ... NodeDeleted ");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
        }
    }
}

2.4 锁测试类

在测试之前需要在zk中先创建连接时定义的根节点/testLock,并设值为 ’ ’ ;

public class TestLock {

    private ZooKeeper zooKeeper;
    // 开启的线程数,模拟多客户端操作
    private static final int CLIENTS_NUM = 3;

    @Before
    public void connection() {
        zooKeeper = ZKUtils.getZooKeeper();
    }

    @After
    public void close() {
        try {
            zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void myLock() {
        //线程计数器控制业务的执行
        final CountDownLatch countDownLatch = new CountDownLatch(CLIENTS_NUM);
        for (int i = 0; i < CLIENTS_NUM; i++) {
            new Thread() {
                @Override
                public void run() {
                	// 锁回调程序是在每个客户端中都单独存在的
                    LockWatchCallBack lockWatchCallBack = new LockWatchCallBack();
                    // 入参赋值zooKeeper
                    lockWatchCallBack.setZooKeeper(zooKeeper);
                    String threadName = Thread.currentThread().getName();                                  		                 
                    // 入参赋值threadName
                    lockWatchCallBack.setThreadName(threadName);

                    try {
                        // 加锁
                        lockWatchCallBack.tryLock();
                        // 模拟执行业务
                        Thread.sleep(10);
                        System.out.println(threadName + " do 1 ... " + lockWatchCallBack.getRootData());

                        // 锁重入
                        lockWatchCallBack.tryLock();
                        Thread.sleep(10);
                        System.out.println(threadName + " do 2 ... " + lockWatchCallBack.getRootData());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 释放锁(加几次锁就要释放几次锁)
                        lockWatchCallBack.unLock();
                        lockWatchCallBack.unLock();

                        // 线程数减1
                        countDownLatch.countDown();
                    }
                }
            }.start();
        }

        try {
        	// 堵塞线程,任务执行完后释放
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end ...");
    }
// 测试结果
/*
WatchedEvent state:SyncConnected type:None path:null
Thread-0===/lock0000000000
Thread-2===/lock0000000001
Thread-1===/lock0000000002
Thread-0 i am coming ...
Thread-0 do 1 ... Thread-0#1
Thread-0 do 2 ... Thread-0#2
Thread-0 stop 2 ...
Thread-0 stop 1 ...
Thread-2 i am coming ...
Thread-2 do 1 ... Thread-2#1
Thread-2 do 2 ... Thread-2#2
Thread-2 stop 2 ...
Thread-2 stop 1 ...
Thread-1 i am coming ...
Thread-1 do 1 ... Thread-1#1
Thread-1 do 2 ... Thread-1#2
Thread-1 stop 2 ...
Thread-1 stop 1 ...
end ...
WatchedEvent state:Closed type:None path:null
*/
}

3 Curator框架实现

框架curator中封装了一整套zookeeper的相关业务实现,使用非常简单

3.1 pom依赖

在这里要注意一下,这里pom中使用的zk版本最好与zk连接中使用的版本一致

<!--zookeeper的客户端依赖-->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.9</version>
</dependency>
<!--curator-->
<dependency>
   <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

3.2 具体实现

使用curator实现分布式锁主要有三个步骤:

  • 创建锁对象:new InterProcessMutex(),通过这个类获取的锁对像是可重入的;
  • 获取锁对象:执行acquire()方法获取锁;
  • 释放锁对象:调用release()方法释放锁;
public class TestLock2 {
    // zk临时节点的父目录
    private static final String LOCK_PATH = "/testLock";
    // 客户端的数量
    private static final int CLIENT_NUMS = 3;
    // 线程堵塞计数器
    private static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_NUMS);

    @Test
    public void myTest2() {
        for (int i = 0; i < CLIENT_NUMS; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 重试策略(重试时间,重试次数)
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                    // 获取连接
                    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
                    client.start();

                    try {
                        // 可重入锁,可以加入重入操作执行
                        final InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
                        // 可让每个客户端请求多次锁资源进行测试
                        for (int j = 1; j <= 1; j++) {
                            try {
                                // 调用acquire获取锁成功后会在/testLock下创建临时节点,节点名称类似与:_c_723c6b99-f0ec-4539-ada2-3e17a48746ed-lock-0000000309
                                if (lock.acquire(10, TimeUnit.SECONDS)) {
                                    System.out.println(Thread.currentThread().getName() + " get lock 1 ...");
                                    // 模拟业务操作
                                    Thread.sleep(10);
                                    System.out.println(Thread.currentThread().getName() + " do 1 ... ");
                                } else {
                                    throw new IllegalStateException(Thread.currentThread().getName() + " get lock 1 timeout ");
                                }

                                // 获取可重入锁
                                if (lock.acquire(10, TimeUnit.SECONDS)) {
                                    System.out.println(Thread.currentThread().getName() + " get lock 2 ...");
                                    Thread.sleep(10);
                                    System.out.println(Thread.currentThread().getName() + " do 2 ... ");
                                } else {
                                    throw new IllegalStateException(Thread.currentThread().getName() + " get lock 2 timeout");
                                }
                            } finally {
                                // 在finally中释放锁,申请几次释放几次,调用release释放锁删除acquire产生的临时节点
                                System.out.println(Thread.currentThread().getName() + " release lock 2");
                                lock.release();
                                System.out.println(Thread.currentThread().getName() + " release lock 1");
                                lock.release();
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        CloseableUtils.closeQuietly(client);
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        // 阻塞等待线程计数器归零
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end ...");
    }
// 测试结果
/*
Thread-2 get lock 1 ...
Thread-2 do 1 ...
Thread-2 get lock 2 ...
Thread-2 do 2 ...
Thread-2 release lock 2
Thread-2 release lock 1
2021-01-25 13:36:31,874 [myid:] - INFO  [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting
Thread-1 get lock 1 ...
Thread-1 do 1 ...
Thread-1 get lock 2 ...
Thread-1 do 2 ...
Thread-1 release lock 2
Thread-1 release lock 1
2021-01-25 13:36:31,914 [myid:] - INFO  [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting
Thread-0 get lock 1 ...
Thread-0 do 1 ...
Thread-0 get lock 2 ...
Thread-0 do 2 ...
Thread-0 release lock 2
Thread-0 release lock 1
2021-01-25 13:36:31,963 [myid:] - INFO  [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting
2021-01-25 13:36:32,010 [myid:] - INFO  [Thread-2:ZooKeeper@1422] - Session: 0x1008587bbf70003 closed
2021-01-25 13:36:32,010 [myid:] - INFO  [Thread-2-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x1008587bbf70003
2021-01-25 13:36:32,109 [myid:] - INFO  [Thread-0:ZooKeeper@1422] - Session: 0x1008587bbf70004 closed
2021-01-25 13:36:32,109 [myid:] - INFO  [Thread-0-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x1008587bbf70004
2021-01-25 13:36:32,209 [myid:] - INFO  [Thread-1:ZooKeeper@1422] - Session: 0x1008587bbf70005 closed
end ...
*/
}

4 对比Redis实现

Redis实现分布式锁

  • 在实现逻辑上,zookeeper就是为了协调分布式服务的,有成熟的框架落地,实现极为简单,而redis实现逻辑比较复杂;
  • 在执行效率上,虽然zookeeper的数据也是存在于内存,但是redis更是以快著称,所以在效率上redis要快一些。

注:此内容仅供参考,如有问题欢迎指出,谢谢!

Logo

更多推荐