zookeeper实现分布式锁
前言zookeeper的设计初衷,就是为了协调分布式服务,因此利用zookeeper来解决分布式锁的问题也是一种较为简单的实现1 原理利用zookeeper的顺序临时节点的特性来实现1.1 获取锁首先,在zookeeper当中创建一个父节点 /testLock;当第一个客户端C1想要获取锁时,会先在父节点下创建一个临时顺序节点N1; 之后,C1会查找父节点下的所有的临时顺序节点并排序,判断自己所创
前言
zookeeper的设计初衷,就是为了协调分布式服务,因此利用zookeeper来解决分布式锁的问题是一种最为简单的实现
1 原理
利用zookeeper的顺序临时节点的特性来实现
1.1 获取锁
- 首先,在zookeeper当中创建一个父节点
/testLock
; - 当第一个客户端C1想要获取锁时,会先在父节点下创建一个临时顺序节点N1; 之后,C1会查找父节点下的所有的临时顺序节点并排序,判断自己所创建的节点N1是不是最小的(最靠前的); 如果是,则成功获得锁;
- 这时候,如果再有一个客户端C2前来获取锁,也会在父节点下再创建一个临时顺序节点N2; 之后,C2也会查找父节点下面所有的临时顺序节点并排序,判断自己所创建的节点N2是不是最小的; 结果发现节点N2不是最小的。 于是,C2向排序仅在它前面的一个节点N1注册一个watcher,用于监听N1节点是否存在; C2抢锁失败,进入了等待监听状态;
- 这时候,如果再有客户端C3前来获取锁,也会在父节点下再创建一个临时顺序节点N3; 之后,C3也会查找父节点下面所有的临时顺序节点并排序,判断自己所创建的节点N3是不是最小的; 结果发现节点N3不是最小的。 于是,C3向排序仅在它前面的一个节点N2注册一个watcher,用于监听N2节点是否存在; C3抢锁失败,进入了等待监听状态;
- 最终各个节点形成一个类似于队列的模型来有序地监听并获取锁。
1.2 释放锁
客户端释放锁的两种情况:
任务执行完,客户端主动释放锁
当前获取锁的客户端C1在任务执行完成后,主动调用delete删除临时节点N1。
任务执行中,客户端崩溃被动解锁
当前获取锁的客户端C1在任务执行过程中崩溃,则会断开与zookeeper服务端的链接;因为是临时节点,所以与该客户端相关联的节点N1也会随之被自动删除;
由于C2注册有watcher一直监听着N1的存在,当N1节点被删除后,C2客户端会立刻收到通知;这时C2会再次查询父节点下面的所有节点并排序,确认自己当前的节点N2是不是最小的,如果是最小,则C2成功获取锁;
同理,之后排队等待的客户端也依次获取锁执行任务。
2 原生API实现
使用zookeeper的客户端api进行原始代码实现
2.1 zk连接工具类
public class ZKUtils {
private static ZooKeeper zooKeeper;
// 这里使用的zk集群,也可以使用单个,在连接后可添加一个路径作为父目录(该目录需要在测试之前手动在zk中创建)
private static String address = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testLock";
// 默认的watcher
private static DefaultWatch defaultWatch = new DefaultWatch();
// 线程锁(阻塞程序,连接成功后释放)
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static ZooKeeper getZooKeeper() {
try {
zooKeeper = new ZooKeeper(address, 1000, defaultWatch);
// 传递countDownLatch到defaultWatch
defaultWatch.setCountDownLatch(countDownLatch);
// 堵塞等待,成功连接后释放
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
return zooKeeper;
}
}
2.2 默认监听watcher
在zookeeper客户端建立连接时使用的默认监听,实现自Watcher接口
public class DefaultWatch implements Watcher {
// 在zk连接下进行入参初始化countDownLatch
private CountDownLatch countDownLatch;
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
// 监听
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
// 建立连接后释放
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
}
}
}
2.3 锁具体实现
加锁、解锁的具体操作实现,需要实现的回调接口:
- Watcher:用于节点间的监听,监听事件类型NodeDeleted;
- AsyncCallback.StringCallback:创建节点时的异步回调create;
- AsyncCallback.Children2Callback:获取子节点时的异步回调getChildren;
public class LockWatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
// 入参初始化
private ZooKeeper zooKeeper;
// 入参初始化,当前操作的线程名
private String threadName;
// 阻塞操作
private CountDownLatch countDownLatch = new CountDownLatch(1);
// 创建的临时节点路径,类似于 /lock0000000001
private String pathName = "";
public String getPathName() {
return pathName;
}
public void setPathName(String pathName) {
this.pathName = pathName;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// 获取锁操作tryLock
public void tryLock() {
// 判断:根数据==线程名,进行锁的重入(每重入一次,锁标志位加1)
String[] str = getRootData().split("#");
if (str.length == 2 && threadName.equals(str[0])) {
try {
// 获取当前加锁的次数
int i = Integer.parseInt(str[1]);
i++;
// 设置根节点数据:当前线程名+“#”+锁个数
zooKeeper.setData("/", (threadName + "#" + i).getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// 客户端连接后创建的临时有序节点,回调AsyncCallback.StringCallback
zooKeeper.create("/lock", (threadName + "#1").getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "tryLock");
}
try {
// 堵塞线程,节点创建成功后释放
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取锁的根目录数据:当前线程名+“#”+锁个数
public String getRootData() {
byte[] data = new byte[0];
try {
data = zooKeeper.getData("/", false, new Stat());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return new String(data);
}
//释放锁
public void unLock() {
try {
// 可重入锁:解锁,每调用一次解锁操作,锁标志减一,为0删除节点
String[] str = getRootData().split("#");
// 获取当前加锁的次数
int i = Integer.parseInt(str[1]);
System.out.println(threadName + " stop "+i+" ... ");
i--;
if (i == 0) {
// i=0,删除节点,并置空根节点数据
zooKeeper.setData("/", "".getBytes(), -1);
zooKeeper.delete(pathName, -1);
} else {
// i!=0,锁标志位减一
zooKeeper.setData("/", (threadName + "#" + i).getBytes(), -1);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
// AsyncCallback.StringCallback:创建节点操作时的回调create
@Override
public void processResult(int rc, String path, Object ctx, String name) {
// 创建的节点名
if (name != null) {
// 创建节点的路径
pathName = name;
System.out.println(threadName + "===" + pathName);// Thread-0===/lock0000000001
//获取当前锁在根目录下创建的子节点 ,回调AsyncCallback.Children2Callback
zooKeeper.getChildren("/", false, this, "AsyncCallback.StringCallback ...");
}
}
// AsyncCallback.Children2Callback:获取子节点的回调getChildren
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
// 查看在当前线程之前已经创建的子节点,存放于参数list中
/*
System.out.println("在当前线程:"+threadName+",之前创建成功的节点");
for (String child:children) {
System.out.print(child+" , "); // lock0000000066 , lock0000000065 ,
}
System.out.println();
*/
// 当前节点集合list是无序的,进行排序操作
Collections.sort(children);
// 判断当前回调的节点是不是第一个节点,list中获取的节点不带‘/’,pathName中的节点带‘/’,需要进行截取
int i = children.indexOf(pathName.substring(1));
if (i == 0) {
// 是,获取锁成功,阻塞结束
System.out.println(threadName + " i am coming ...");
try {
// 将自己的线程名设置给根目录,并设置一个锁的标志位,根据判断该值进行锁的重入操作
zooKeeper.setData("/", (threadName + "#1").getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
} else {
// 否,watcher监控当前回调节点的前一个节点的删除,回调AsyncCallback.StatCallback
zooKeeper.exists("/" + children.get(i - 1), this, this, "监控当前节点的前一个节点");
}
}
// AsyncCallback.StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 监控失效后的再处理回调。。。
}
// Watcher
@Override
public void process(WatchedEvent event) {
// 监听所发生的事件类型
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
// 监听节点删除事件,删除后再次获取子节点,触发之后节点获取锁的操作
zooKeeper.getChildren("/", false, this, "AsyncCallback.StringCallback ... NodeDeleted ");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
}
2.4 锁测试类
在测试之前需要在zk中先创建连接时定义的根节点/testLock,并设值为 ’ ’ ;
public class TestLock {
private ZooKeeper zooKeeper;
// 开启的线程数,模拟多客户端操作
private static final int CLIENTS_NUM = 3;
@Before
public void connection() {
zooKeeper = ZKUtils.getZooKeeper();
}
@After
public void close() {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void myLock() {
//线程计数器控制业务的执行
final CountDownLatch countDownLatch = new CountDownLatch(CLIENTS_NUM);
for (int i = 0; i < CLIENTS_NUM; i++) {
new Thread() {
@Override
public void run() {
// 锁回调程序是在每个客户端中都单独存在的
LockWatchCallBack lockWatchCallBack = new LockWatchCallBack();
// 入参赋值zooKeeper
lockWatchCallBack.setZooKeeper(zooKeeper);
String threadName = Thread.currentThread().getName();
// 入参赋值threadName
lockWatchCallBack.setThreadName(threadName);
try {
// 加锁
lockWatchCallBack.tryLock();
// 模拟执行业务
Thread.sleep(10);
System.out.println(threadName + " do 1 ... " + lockWatchCallBack.getRootData());
// 锁重入
lockWatchCallBack.tryLock();
Thread.sleep(10);
System.out.println(threadName + " do 2 ... " + lockWatchCallBack.getRootData());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁(加几次锁就要释放几次锁)
lockWatchCallBack.unLock();
lockWatchCallBack.unLock();
// 线程数减1
countDownLatch.countDown();
}
}
}.start();
}
try {
// 堵塞线程,任务执行完后释放
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end ...");
}
// 测试结果
/*
WatchedEvent state:SyncConnected type:None path:null
Thread-0===/lock0000000000
Thread-2===/lock0000000001
Thread-1===/lock0000000002
Thread-0 i am coming ...
Thread-0 do 1 ... Thread-0#1
Thread-0 do 2 ... Thread-0#2
Thread-0 stop 2 ...
Thread-0 stop 1 ...
Thread-2 i am coming ...
Thread-2 do 1 ... Thread-2#1
Thread-2 do 2 ... Thread-2#2
Thread-2 stop 2 ...
Thread-2 stop 1 ...
Thread-1 i am coming ...
Thread-1 do 1 ... Thread-1#1
Thread-1 do 2 ... Thread-1#2
Thread-1 stop 2 ...
Thread-1 stop 1 ...
end ...
WatchedEvent state:Closed type:None path:null
*/
}
3 Curator框架实现
框架curator中封装了一整套zookeeper的相关业务实现,使用非常简单
3.1 pom依赖
在这里要注意一下,这里pom中使用的zk版本最好与zk连接中使用的版本一致
<!--zookeeper的客户端依赖-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
3.2 具体实现
使用curator实现分布式锁主要有三个步骤:
- 创建锁对象:
new InterProcessMutex()
,通过这个类获取的锁对像是可重入的; - 获取锁对象:执行
acquire()
方法获取锁; - 释放锁对象:调用
release()
方法释放锁;
public class TestLock2 {
// zk临时节点的父目录
private static final String LOCK_PATH = "/testLock";
// 客户端的数量
private static final int CLIENT_NUMS = 3;
// 线程堵塞计数器
private static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_NUMS);
@Test
public void myTest2() {
for (int i = 0; i < CLIENT_NUMS; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 重试策略(重试时间,重试次数)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 获取连接
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
client.start();
try {
// 可重入锁,可以加入重入操作执行
final InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
// 可让每个客户端请求多次锁资源进行测试
for (int j = 1; j <= 1; j++) {
try {
// 调用acquire获取锁成功后会在/testLock下创建临时节点,节点名称类似与:_c_723c6b99-f0ec-4539-ada2-3e17a48746ed-lock-0000000309
if (lock.acquire(10, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " get lock 1 ...");
// 模拟业务操作
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() + " do 1 ... ");
} else {
throw new IllegalStateException(Thread.currentThread().getName() + " get lock 1 timeout ");
}
// 获取可重入锁
if (lock.acquire(10, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " get lock 2 ...");
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() + " do 2 ... ");
} else {
throw new IllegalStateException(Thread.currentThread().getName() + " get lock 2 timeout");
}
} finally {
// 在finally中释放锁,申请几次释放几次,调用release释放锁删除acquire产生的临时节点
System.out.println(Thread.currentThread().getName() + " release lock 2");
lock.release();
System.out.println(Thread.currentThread().getName() + " release lock 1");
lock.release();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
countDownLatch.countDown();
}
}
}).start();
}
// 阻塞等待线程计数器归零
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end ...");
}
// 测试结果
/*
Thread-2 get lock 1 ...
Thread-2 do 1 ...
Thread-2 get lock 2 ...
Thread-2 do 2 ...
Thread-2 release lock 2
Thread-2 release lock 1
2021-01-25 13:36:31,874 [myid:] - INFO [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting
Thread-1 get lock 1 ...
Thread-1 do 1 ...
Thread-1 get lock 2 ...
Thread-1 do 2 ...
Thread-1 release lock 2
Thread-1 release lock 1
2021-01-25 13:36:31,914 [myid:] - INFO [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting
Thread-0 get lock 1 ...
Thread-0 do 1 ...
Thread-0 get lock 2 ...
Thread-0 do 2 ...
Thread-0 release lock 2
Thread-0 release lock 1
2021-01-25 13:36:31,963 [myid:] - INFO [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting
2021-01-25 13:36:32,010 [myid:] - INFO [Thread-2:ZooKeeper@1422] - Session: 0x1008587bbf70003 closed
2021-01-25 13:36:32,010 [myid:] - INFO [Thread-2-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x1008587bbf70003
2021-01-25 13:36:32,109 [myid:] - INFO [Thread-0:ZooKeeper@1422] - Session: 0x1008587bbf70004 closed
2021-01-25 13:36:32,109 [myid:] - INFO [Thread-0-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x1008587bbf70004
2021-01-25 13:36:32,209 [myid:] - INFO [Thread-1:ZooKeeper@1422] - Session: 0x1008587bbf70005 closed
end ...
*/
}
4 对比Redis实现
- 在实现逻辑上,zookeeper就是为了协调分布式服务的,有成熟的框架落地,实现极为简单,而redis实现逻辑比较复杂;
- 在执行效率上,虽然zookeeper的数据也是存在于内存,但是redis更是以快著称,所以在效率上redis要快一些。
注:此内容仅供参考,如有问题欢迎指出,谢谢!
更多推荐
所有评论(0)