分布式架构-ZK分布式锁中死锁和羊群效应解决方案
分布式架构-ZK分布式锁解决方案一、解决思路获取锁方法:多个jvm同时在zk上创建一个临时节点,最终只能够有一个jvm创建临时节点成功,如果能够创建临时节点成功jvm 表示获取锁成功能够正常执行业务逻辑,如果没有创建临时节点成功的jvm,则表示获取锁失败。获取锁失败之后,可以采用不断重试策略,重试多次获取锁失败之后,当前的jvm就进入到阻塞状态,并监听节点事件。释放锁方法:直接调用.close()
分布式架构-ZK分布式锁中死锁和羊群效应解决方案
一、效果演示
有两个接口,GetTest2和GetTest3,分别都加上了@BxcZkLock注解,其中value 代表是否包含事物,如果是,则会在方法执行完自动提交事物,否则yic回滚,或者当前分布式锁超时回滚,isOrder代表锁的类型是公平锁还是非公平锁。非公平锁由于会唤醒所有等待中的锁出现争抢线程的现象也就是羊群效应,但公平锁加了顺序性,一次只唤醒一个等待中的锁,大大提高了效率。此处的公平锁,非公平锁分别基于zk的有序临时节点和普通临时节点实现。
下面将接口部署在两个不同的jvm中端口为8080、8081,快速请求两个接口。

打印日志。
8081
8080
可以看到,8081先拿到锁并执行,8080未拿到锁等待8081执行完后获取到锁,并在执行完后提交了事物。
如果其中一个jvm持有锁因为某些原因解锁失败,则会导致另一个jvm死锁现象,所以有必要有个超时的处理。
打印日志。
程序中我默认设置的超时时间是五秒,超过五秒还未释放的锁,将会被强制释放,有事物的话会回滚事物。
二、解决思路
-
获取锁方法:
多个jvm同时在zk上创建一个临时节点,最终只能够有一个jvm创建临时节点成功,如果能够创建临时节点成功jvm 表示获取锁成功能够正常执行业务逻辑,如果没有创建临时节点成功的jvm,则表示获取锁失败。获取锁失败之后,可以采用不断重试策略,重试多次获取锁失败之后,当前的jvm就进入到阻塞状态,并监听节点事件。 -
释放锁方法:
直接调用.close()释放锁,因为采用临时节点,当我们调用close()方法的时候该临时节点会自动被删除。其他没有获取到锁的jvm会触发节点删除事件监听,然后唤起线程进入到获取锁的状态。 -
被唤醒的方法:
被阻塞的jvm(没有获取锁成功的jvm),采用事件监听的方式监听到节点已经被删除的情况下,则开始从新进入到获取锁的状态。 -
业务超时,一直不释放锁时:
记录每次获取锁的时间,并存在缓存中,采用定时任务定时去寻找已过期的锁,如果需要同时回滚事物,因为开启事物的线程和定时任务的线程不是同一个线程,因此通过句柄无法回滚事物,但可以设置事物超时时间和锁的超时时间一致,这样定时任务回去释放锁停止线程,事物同时也会回滚(注意:Spring 的事物超时时间是针对SQL的执行时间,比如设置事物超时为5s,先插入数据库后sleep 10s,是不会回滚的,而反过来先sleep 后执行sql则会回滚,所以如果不确定哪个可能执行时间较长,可以在方法最后写一个固定的查询或其他db操作语句来延长事物时间)。 -
如何避免分布式锁羊群效应
羊群效应:
当jvm释放锁的时候,会唤醒正在等待的jvm 从新进入到获取锁的状态。如果正在阻塞的等待获取锁的jvm,如果有几十个或者几百个、上千个的情况下ZkServer端唤醒所有正在等待的jvm,从新进入到获取锁的状态,唤醒的成本是非常高有可能会造成我们ZkServer端阻塞。
解决方案:
基于临时顺序编号节点实现 多个jvm同时创建一个临时顺序编号节点,如果当前jvm创建的临时顺序编号节点是最小的情况下,则表示获取说成功,如果不是最小的情况下,则表示获取锁失败,就会进入到阻塞状态;当前的jvm订阅到我们上一个节点
三、核心实现部分
- pom依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
- Lock接口定义
public interface Lock {
void getLock(); //获取锁
void unlock(); //释放锁
}
- 非公平锁实现逻辑
@Slf4j
public class ZookeeperZkLock implements ZkLock {
ZkClientUtils zkClientUtils;
private String lockPath = "/lockPath";
private CountDownLatch countDownLatch;
public ZookeeperZkLock() {
if (zkClientUtils == null) {
zkClientUtils = new ZkClientUtils();
}
}
@Override
public void getLock() {
for (int i = 0; i < 5; i++) {
if (tryLock()) {
return;
}
}
waitLock();
getLock();
}
@Override
public void unlock() {
if (zkClientUtils != null) {
zkClientUtils.getZkClient().delete(lockPath);
zkClientUtils.close();
zkClientUtils = null;
log.info("zk-----> 释放锁 ");
}
}
protected boolean tryLock() {
try {
zkClientUtils.getZkClient().createEphemeral(lockPath);
log.info("zk-----> 获取锁成功 ");
return true;
} catch (Exception e) {
return false;
}
}
protected void waitLock() {
try {
boolean exists = zkClientUtils.getZkClient().exists(lockPath);
if (exists) {
log.info("zk-----> 未获取到锁等待 ");
// 注册一个监听事件
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
log.info("zk-----> 临时节点删除:" + lockPath);
countDownLatch.countDown();
}
};
zkClientUtils.getZkClient().subscribeDataChanges(lockPath, iZkDataListener);
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
// 唤醒成功之后,则移除该事件
zkClientUtils.getZkClient().unsubscribeDataChanges(lockPath, iZkDataListener);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 公平锁实现逻辑
@Slf4j
public class ZookeeperOrderZkLock implements ZkLock {
ZkClientUtils zkClientUtils;
private String lockParent = "/lock";
private String lockPath = "/lockPath";
private CountDownLatch countDownLatch;
// 上一个临时顺序编号节点
private String prevLockPath;
public ZookeeperOrderZkLock() {
if (zkClientUtils == null) {
zkClientUtils = new ZkClientUtils();
}
}
@Override
public void getLock() {
for (int i = 0; i < 5; i++) {
if (tryLock()) {
return;
}
}
waitLock();
getLock();
}
@Override
public void unlock() {
if (zkClientUtils != null) {
zkClientUtils.getZkClient().delete(lockPath);
zkClientUtils.close();
zkClientUtils = null;
log.info("zk-----> 释放锁 ");
}
}
protected boolean tryLock() {
try {
String tempNodeName = LockContext.get();
if (StringUtils.isEmpty(tempNodeName)) {
tempNodeName = zkClientUtils.getZkClient().createEphemeralSequential(lockParent + lockPath, "lock");
LockContext.set(tempNodeName);
}
List<String> childrens = zkClientUtils.getZkClient().getChildren(lockParent);
if (childrens == null) {
log.info("zk获取-->异常");
}
Collections.sort(childrens);
if (tempNodeName.equals(lockParent + "/" + childrens.get(0))) {
log.info("zk-----> 获取锁成功 ");
return true;
}
int index = Collections.binarySearch(childrens, tempNodeName.substring(lockParent.length() + 1));
prevLockPath = lockParent + "/" + childrens.get(index - 1);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
protected void waitLock() {
try {
boolean exists = zkClientUtils.getZkClient().exists(prevLockPath);
if (exists) {
log.info("zk-----> 未获取到锁等待 ");
// 注册一个监听事件
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
log.info("zk-----> 临时节点删除:" + prevLockPath);
countDownLatch.countDown();
}
};
zkClientUtils.getZkClient().subscribeDataChanges(prevLockPath, iZkDataListener);
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
// 唤醒成功之后,则移除该事件
zkClientUtils.getZkClient().unsubscribeDataChanges(prevLockPath, iZkDataListener);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 利用AOP,在方法前后执行锁
@Component
@Slf4j
@Aspect
public class ExtAsyncAop {
@Autowired
private TransactionUtils transactionUtils;
@Pointcut("@annotation(com.bxc.zklock.lock.annotation.BxcZkLock)")
public void BrokerAspect(){
}
@AfterThrowing(value = "BrokerAspect()",throwing = "e")
public void doAfterThrowingGame(JoinPoint jp, Exception e) {
String name = jp.getSignature().getName();
System.out.println(name+"方法异常通知:"+e.toString());
}
@Around("BrokerAspect()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
BxcZkLock declaredAnnotation = ((MethodSignature) pjp.getSignature()).getMethod().getDeclaredAnnotation(BxcZkLock.class);
TransactionStatus transactionStatus = null;
ZkLock zkLock = null;
if (declaredAnnotation.isOrder()){
zkLock = new ZookeeperOrderZkLock();
}else {
zkLock = new ZookeeperZkLock();
}
zkLock.getLock();
String serviceId = UUID.randomUUID().toString();
if (isTransaction(declaredAnnotation)){
transactionStatus = transactionUtils.begin();
LockScheduled.lockInfoMap.put(serviceId, new LockInfo(serviceId, Thread.currentThread(), LockStatusEnum.START,System.currentTimeMillis()));
}else {
LockScheduled.lockInfoMap.put(serviceId, new LockInfo(serviceId, Thread.currentThread(), LockStatusEnum.START,zkLock,System.currentTimeMillis()));
}
try{
Object obj = pjp.proceed();
System.out.println("方法执行结束");
if (transactionStatus != null){
transactionUtils.commit(transactionStatus);
}
zkLock.unlock();
LockScheduled.lockInfoMap.remove(serviceId);
return obj;
}catch (Exception e){
if (isTransaction(declaredAnnotation)){
transactionUtils.rollback(transactionStatus);
}
zkLock.unlock();
LockScheduled.lockInfoMap.remove(serviceId);
throw e;
}
}
public boolean isTransaction(JoinPoint pjp){
BxcZkLock declaredAnnotation = ((MethodSignature) pjp.getSignature()).getMethod().getDeclaredAnnotation(BxcZkLock.class);
if (LockStatusEnum.Translation == declaredAnnotation.value()){
return true;
}
return false;
}
public boolean isTransaction(BxcZkLock bxcZkLock){
if (LockStatusEnum.Translation == bxcZkLock.value()){
return true;
}
return false;
}
}
- 利用定时任务关闭超时锁,避免死锁
@Slf4j
@Component
public class LockScheduled {
//锁的超时时间为5秒(可配置配置文件中)
public Long locktimeout = 5000L;
public static Map<String, LockInfo> lockInfoMap = new ConcurrentHashMap<>();
/**
* 检测超时未释放的锁
*/
@Scheduled(cron = "0/2 * * * * *")
public void taskService() {
try {
lockInfoMap.forEach((k, lockInfo) -> {
log.info("分布式锁超时检测");
if ((System.currentTimeMillis() - lockInfo.getCreateTime()) >= locktimeout) {
log.info("持有锁超时关闭-----> " + lockInfo.getLockId());
//释放锁
lockInfo.getZkLock().unlock();
//直接停止线程,事物超过时间没有提交会自动回滚
lockInfo.getLockThread().interrupt();
// 移除队列
lockInfoMap.remove(k);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 手动事物
@Component
@Scope("prototype")
public class TransactionUtils {
private TransactionStatus transactionStatus;
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
public TransactionStatus begin() {
System.out.println("开启事物");
DefaultTransactionAttribute defaultTransactionAttribute = new DefaultTransactionAttribute();
//事物的超时时间,最好配置在配置文件中
defaultTransactionAttribute.setTimeout(5);
transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionAttribute);
return transactionStatus;
}
public void commit(TransactionStatus transaction) {
System.out.println("提交事物");
dataSourceTransactionManager.commit(transaction);
}
public void rollback() {
System.out.println("回滚事物");
dataSourceTransactionManager.rollback(transactionStatus);
}
public void rollback(TransactionStatus transaction) {
System.out.println("回滚事物");
dataSourceTransactionManager.rollback(transaction);
}
}
以上为核心实现逻辑,重要的是思想,如果有疑问,可以留言讨论哦!
更多推荐

所有评论(0)