• 有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
  • zookeeper的锁天生是公平锁 根据创建临时节点的顺序。

2> 缺点?

  • 性能上不如使用缓存实现分布式锁。
    • 因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。
    • ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
  • 加锁不管成功还是失败的第一步是先创建临时节点 这样如果加锁的过多 会对zookeeper的存储压力过大。

四、InterProcessMute实现分布式锁原理

InterProcessMute首先是一个互斥锁,其次是依赖Zookeeper临时顺序节点实现的分布式锁;对于锁而言,最重要的是保护临界区,让多个线程对临界区的访问互斥;InterProcessMute依赖Zookeeper临时顺序节点的有序性实现分布式环境下互斥,依赖JVM层面的synchronized实现节点监听的互斥(防止羊群效应)。

InterProcessMute的acquire()方法用于获取锁,release()方法用于释放锁。

以如下测试类为例,展开源码分析:

public class LockTest {
    public static void main(String[] args) {
        //重试策略,定义初试时间3s,重试3次
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 3);

        //初始化客户端
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(3000)
                .connectionTimeoutMs(3000)
                .retryPolicy(exponentialBackoffRetry)
                .build();
        // start()开始连接,没有此会报错
        client.start();
        //利用zookeeper的类似于文件系统的特性进行加锁 第二个参数指定锁的路径
        InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/lock");

        try {
            //加锁
            interProcessMutex.acquire();
            System.out.println(Thread.currentThread().getName() + "获取锁成功");
            Thread.sleep(60\_000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                //释放锁
                interProcessMutex.release();
                System.out.println(Thread.currentThread().getName() + "释放锁成功");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

1、加锁流程(acquire()方法)

InterProcessMutex#acquire()方法:

在这里插入图片描述

acquire()方法中直接调用internalLock()方法以不加锁成功就一直等待的方式加锁;
如果加锁出现异常,则直接抛出IOException。

0)加锁流程图

在这里插入图片描述

1)internalLock()
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /\*
 Note on concurrency: a given lockData instance
 can be only acted on by a single thread so locking isn't necessary
 \*/

    // 当前线程
    Thread currentThread = Thread.currentThread();

    // 当前线程持有的锁信息
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // 可重入,lockCount +1;
        // 此处只在本地变量变化了,没发生任何网络请求;对比redisson的分布式锁可重入的实现是需要操作redis的
        lockData.lockCount.incrementAndGet();
        return true;
    }

    // 进行加锁,继续往里跟
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        // 加锁成功
        LockData newLockData = new LockData(currentThread, lockPath);
        // 放入map
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

internalLock()方法有两个入参:long类型的time 和 TimeUnit类型的 unit 共同表示加锁的超时时间。

一个InterProcessMutex在同一个JVM中可以由多个线程共同操作,因为其可重入性体现在JVM的线程层面,所以其维护了一个Map类型的变量threadData

private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

用于记录每个线程持有的锁信息;锁信息采用LockData表示;

LockData

LockData是InterProcessMutex的静态内部类,其仅有三个变量:持有锁的线程、锁路径、锁重入的次数;

private static class LockData {
    // 持有锁的线程
    final Thread owningThread;
    // 锁的路径
    final String lockPath;
    // 重入锁的次数
    final AtomicInteger lockCount = new AtomicInteger(1);

    private LockData(Thread owningThread, String lockPath)
    {
        this.owningThread = owningThread;
        this.lockPath = lockPath;
    }
}

internalLock()方法逻辑
  1. 根据当前线程从InterProcessMutex的threadData变量中获取当前线程持有的锁信息;
  • 如果已经持有锁,说明是JVM层面的锁重入,则直接对LockData.lockCount + 1,然后返回加锁成功。
  • 锁重入的过程是没有产生任何网络请求的;而Redisson分布式锁可重入的实现是需要每次都操作Redis的。
  1. 如果未持有锁,则尝试加锁;
  • 加锁逻辑体现在LockInternals#attemptLock()方法中;
  • 加锁成功,则将加锁的路径和当前线程一起封装为锁数据LockData,以线程为key,LockData为value,作为键值对加入到threadData中;并返回加锁成功
  • 加锁失败,则直接返回加锁失败。
2)LockInternals#attemptLock() --> 尝试加锁
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
    final long startMillis = System.currentTimeMillis();
    // 将时间统一格式化ms
    final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int retryCount = 0;

    String ourPath = null;
    boolean hasTheLock = false;
    boolean isDone = false;
    while (!isDone) {
        isDone = true;

        try {
            // 创建临时有序节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 判断是否为第一个节点 如果是表明加锁成功。跟进去
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        } catch (KeeperException.NoNodeException e) {
            // 重试机制
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                isDone = false;
            } else {
                throw e;
            }
        }
    }

    if (hasTheLock) {
        return ourPath;
    }

    return null;
}

attemptLock()方法有三个入参:long类型的time 和 TimeUnit类型的 unit 共同表示尝试加锁的超时时间,字节数组类型的lockNodeBytes表示锁路径对应的节点值。

通过InterProcessMutex#internalLock()方法进入到attemptLock()方法时,lockNodeBytes为null,即:不给锁路径对应的节点赋值。
在这里插入图片描述

尝试加锁逻辑:

  1. 首先尝试加锁是支持重试机制的;尝试加锁的返回值为加锁成功的锁路径,如果加锁未成功则返回null。
  2. 通过锁驱动器LockInternalsDriver直接创建Zookeeper的临时有序节点,并返回节点路径;
  • 具体逻辑体现在StandardLockInternalsDriver#createsTheLock()方法中;
  1. 判断节点路径是否为第一个节点,如果是,表明加锁成功;否则等待Watcher唤醒。
  • 具体逻辑体现在internalLockLoop()方法中;
1> StandardLockInternalsDriver#createsTheLock() --> 创建临时有序节点

为什么LockInternalsDriver接口的实现是StandardLockInternalsDriver?

  • 因为在LockInternals构造器被调用时,传入的LockInternalsDriver是StandardLockInternalsDriver。
    在这里插入图片描述

createsTheLock()方法:

@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
    String ourPath;
    if ( lockNodeBytes != null )
    {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL\_SEQUENTIAL).forPath(path, lockNodeBytes);
    }
    else
    {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL\_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

方法中会级联创建锁路径,即:锁路径的父路径不存在时,会一级一级的创建,而不是像原生的zookeeper create命令一样报错–父路径不存在。

2> 判断刚创建的锁路径是否为第一个节点

LockInternals#internalLockLoop()方法:

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
    boolean haveTheLock = false;
    boolean doDelete = false;
    try {
        // debug进不去,暂时忽略
        if (revocable.get() != null) {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }

        // 获取锁成功才会退出这个while 或者客户端状态不正常
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            // 获取所有子节点(网络IO访问Zookeeper) 并排好序
            List<String> children = getSortedChildren();
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

            /\*\*
 \* 判断是否为第一个节点
 \* 返回值predicateResults中的getsTheLock()表示加锁是否成功;
 \* pathToWatch()表示加锁失败后监听的节点
 \*/
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {
                // 当前节点是第一个节点 加锁成功 退出while循环
                haveTheLock = true;
            } else {
                // 监听上一个节点 getPathToWatch()返回的就是自己前面的节点
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                // 这里加互斥锁的对象和Watcher唤醒的对象是一样的
                synchronized (this) {
                    try {
                        /\*\*
 \* 监听前一个节点,watcher里面会进行唤醒;
 \* 这里只会监听前一个节点,防止羊群效应。这块对比redisson是使用pubsub 唤醒全部节点
 \*/
                        // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if (millisToWait != null) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if (millisToWait <= 0) {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }

                            // 加锁的时间限制
                            wait(millisToWait);
                        } else {
                            // 加锁没有时间限制,则一直等待
                            wait();
                        }
                    } catch (KeeperException.NoNodeException e) {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    } catch (Exception e) {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    } finally {
        // 加锁超时 或 加锁异常 后删除当前节点
        if (doDelete) {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

internalLockLoop()方法逻辑:

  1. 当zookeeper Client状态正常时,while循环中尝试获取锁。
  2. 获取所有子节点(网络IO访问Zookeeper) 并排好序,然后减去Znode有序节点的前缀,判断当前节点是否为第一节点;
  • 如果是,则获取锁成功,直接返回。
  • 如果不是,则说明获取锁失败,进而在synchronized中互斥的监听当前节点的前一个节点,防止羊群效应、JVM中多个加锁失败的节点监听同一个节点。
    如果加锁有时间限制,则等待指定时间,超时后删除当前节点,加锁出现异常也会删除当前节点。如果加锁没有时间限制,则一直等待,直到加锁成功。

此外,StandardLockInternalsDriver#getsTheLock()方法负责判断当前节点是否为第一个节点、如果当前节点不是第一个节点,当前节点应该监听哪一个节点;

@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
    // 查看当前节点在所有有序节点中的位置
    int ourIndex = children.indexOf(sequenceNodeName);
    // 校验节点位置不能 < 0
    validateOurIndex(sequenceNodeName, ourIndex);

    // maxLeases为1,如果节点是顺序节点中的第一个,表示可以获取到锁,maxLeases为1
    boolean getsTheLock = ourIndex < maxLeases;
    // 如果获取不到锁,pathToWatch赋值为当前节点的前一个节点,即:Watcher去监听当前节点的前一个节点
    String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

    return new PredicateResults(pathToWatch, getsTheLock);
}

3)监听器的运作

获取锁失败监听当前节点的前一个节点时,会针对LockInternals类加互斥锁,然后挂起线程;等到相应事件触发监听器时,需要调用notify()唤醒这个线程;

private final Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        // 收到监听的事件之后进行唤醒,唤醒的对象和synchronized的对象是同一个
        client.postSafeNotify(LockInternals.this);
    }
};

// CuratorFramework类的方法
default CompletableFuture<Void> postSafeNotify(Object monitorHolder)
    {
        return runSafe(() -> {


![img](https://img-blog.csdnimg.cn/img_convert/d51e98f95e1a0a6717e454fb7742114d.png)
![img](https://img-blog.csdnimg.cn/img_convert/5bb5fcbcf8315ba9296f2425cc49a2d8.png)
![img](https://img-blog.csdnimg.cn/img_convert/6596699d0a732a12b409874931d76ece.png)

**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!**

**由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新**

**[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**

ableFuture<Void> postSafeNotify(Object monitorHolder)
    {
        return runSafe(() -> {


[外链图片转存中...(img-GPKCdlnq-1714675500082)]
[外链图片转存中...(img-qXjTvLY2-1714675500082)]
[外链图片转存中...(img-tKqkXZoa-1714675500082)]

**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!**

**由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新**

**[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**

Logo

更多推荐