使用分布式锁的背景:

分布式锁是一种在分布式系统中协调多个节点访问共享资源的机制,意思就是说不管这些节点是部署的相同服务还是不同服务只要共用了某个数据就需要使用锁防止数据被多节点同时修改,那么防止共享数据被修改就必须用分布式锁吗?这个还是具体要看服务的架构是什么样的,如果多节点修改的共享数据是在一个支持事务的数据库中,比如多个节点访问的共享数据在一个mysql中,或者是一个单主节点的mysql集群,而恰好修改和插入数据都只在主节点进行那么使用mysql本身的锁机制也可以防止共享数据被多节点同时修改,由此可以看出分布式锁的实现是需要有一层在各节点访问共享数据时可以保证各节点能够感知到其它节点在修改共享数据,那么用多线程使用锁去类比理解分布式锁或许会好理解些,但是这就需要了解锁的细节了,比如在多线程中出现的死锁,锁升级,锁竞争,分布式锁是否也会出现这个问题。

使用redis实现分布式锁,此时需要两个节点:

导入依赖:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

 以RedisTemplate为例,设置分布式锁代码如下:

 @Autowired
    private RedisTemplate redisTemplate;

    int defendThread01 = 0;

    int defendThread02 = 0;

    int thread01 = 1;

    int thread02 = 1;

    @Test
    public void multiLock() {
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setValueSerializer(RedisSerializer.string());
        ValueOperations valueOperations = redisTemplate.opsForValue();

        //枪锁线程01
        Thread threada = new Thread(() -> {
            System.out.println("线程01尝试执行任务");
            while (thread01 == 1) {
                if (valueOperations.setIfAbsent("set", "2", 6, TimeUnit.SECONDS)) {
                    System.out.println("线程01抢到锁了");
                    defendThread01=1;
                    //守护线程,给锁续期
                    Thread daemonThread = new Thread(() -> {
                        while (defendThread01 == 1) {
                            try {
                                System.out.println(Thread.currentThread().getName()+"---thread01的守护线程延长失效时间");
                                Thread.sleep(3000);
                                valueOperations.setIfPresent("set", "2", 4, TimeUnit.SECONDS);
                                Long set = valueOperations.getOperations().getExpire("set");
                                System.out.println("thread01的守护线程延长失效时间为:--"+set+"秒");
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    });
                    daemonThread.start();

                    try {
                        System.out.println("thread01进入主程序");
                        //设置线程休眠时间为4秒,模仿实际任务执行了4秒
                        Thread.sleep(4000);
                        System.out.println("6秒钟线程01执行完任务了");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    defendThread01 = 0;
                    thread01=0;
                }
            }
        });

        //枪锁线程02
        Thread threadb = new Thread(() -> {
            System.out.println("线程02尝试执行任务");
            while (thread02 == 1) {
                if (valueOperations.setIfAbsent("set", "2", 6, TimeUnit.SECONDS)) {
                    System.out.println("线程02抢到锁了");
                    defendThread02=1;
                    //守护线程,给锁续期
                    Thread daemonThread =  new Thread(() -> {
                        while (defendThread02 == 1) {
                            try {
                                System.out.println(Thread.currentThread().getName()+"---thread02的守护线程延长失效时间");
                                Thread.sleep(3000);
                                valueOperations.setIfPresent("set", "2", 4, TimeUnit.SECONDS);
                                Long set = valueOperations.getOperations().getExpire("set");
                                System.out.println("thread02的守护线程延长失效时间为:--"+set+"秒");
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    });
                    daemonThread.start();
                    try {
                        System.out.println("thread02进入主程序");
                        //设置线程休眠时间为4秒,模仿实际任务执行了4秒
                        Thread.sleep(4000);
                        defendThread02 = 0;
                        thread02=0;
                        System.out.println("6秒钟线程02执行完任务了");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                }
            }
        });
        threadb.start();
        threada.start();

        try {
            threadb.join();-------------------------01
            threada.join();-------------------------02
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

执行结果为:

线程01尝试执行任务
线程02尝试执行任务
线程01抢到锁了
thread01进入主程序
Thread-4---thread01的守护线程延长失效时间
thread01的守护线程延长失效时间为:--4秒
Thread-4---thread01的守护线程延长失效时间
6秒钟线程01执行完任务了
thread01的守护线程延长失效时间为:--4秒
线程02抢到锁了
thread02进入主程序
Thread-5---thread02的守护线程延长失效时间
thread02的守护线程延长失效时间为:--4秒
Thread-5---thread02的守护线程延长失效时间
6秒钟线程02执行完任务了

代码里用线程模仿进程来测试redis实现分布式锁。其中通过setIfAbsent方法来进行加锁设置,通过setIfPresent方法来给锁续期防止任务未执行完锁就失效了。由于setIfAbsent方法是原子性的所以也可以避免必须通过lua来书写分布式锁。

这里需要注意的是:01和02行加入后子线程中的setIfAbsent和setIfPresent方法才会生效。为什么会这样呢?需要后续了解。(可能和redis在主线程执行完后会释放redis连接,而分支线程没有redis连接)

使用redisson开启分布式锁,用多线程模拟,代码如下:

导入依赖

  <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.16.1</version>
        </dependency>

添加客户端:

@Configuration
public class RedissonConfiguration {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        //设置redis的地址,这里是单机模式
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        //设置Redisson存储数据的格式,这里是使用的Json格式
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

 测试代码:

  @Autowired
    private RedissonClient redissonClient;

    @Test
    public void redissonLock() {
        Thread threada =
                new Thread(() -> {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    //1.获取锁对象,指定锁名称
                    RLock lock = redissonClient.getLock("xiao");
                    try {
                        //2.尝试获取锁,参数:waitTime – 获取锁的最长时间 leaseTime – 租赁时间 unit – 时间单位
                        boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
                        System.out.println(isLock);
                        if (!isLock) {
                            //获取锁失败处理
                            System.out.println("01获取锁失败");
                            Thread.sleep(2000);
                        } else {
                            //获取锁成功处理
                            System.out.println("01获取锁成功");
                            Thread.sleep(2000);
                            lock.unlock();
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
        Thread threadb =
                new Thread(() -> {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    //1.获取锁对象,指定锁名称
                    RLock lock = redissonClient.getLock("xiao");
                    try {
                        //2.尝试获取锁,参数:waitTime – 获取锁的最长时间 leaseTime – 租赁时间 unit – 时间单位
                        boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
                        System.out.println(isLock);
                        if (!isLock) {
                            //获取锁失败处理
                            System.out.println("02获取锁失败");
                            Thread.sleep(2000);
                        } else {
                            //获取锁成功处理
                            System.out.println("02获取锁成功");
                            Thread.sleep(2000);
                            lock.unlock();
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });

        threada.start();
        threadb.start();
        try {
            threada.join();
            threadb.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

运行代码结果为

true
02获取锁成功
false
01获取锁失败

例中以new Thread开启线程,也可以尝试线程池开启。

以下为线程池方式:

  @Test
    public void mutilThread() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 5, 30, TimeUnit.SECONDS, new LinkedBlockingDeque());
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            try {

                RLock lock = redissonClient.getLock("li");
                boolean b = lock.tryLock(1, 10, TimeUnit.SECONDS);
                if (b) {
                    System.out.println("01加锁成功");
                    Thread.sleep(1000);
                    lock.unlock();
                } else {
                    System.out.println("01加锁失败");
                }

            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, threadPoolExecutor);
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {

                RLock lock = redissonClient.getLock("li");
                boolean b = lock.tryLock(1, 10, TimeUnit.SECONDS);
                if (b) {
                    System.out.println("02加锁成功");
                    Thread.sleep(1000);
                    lock.unlock();
                } else {
                    System.out.println("02加锁失败");
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 1;
        }, threadPoolExecutor);

        try {
            Object o = objectCompletableFuture.get();
            System.out.println(o);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

 执行结果为:

02加锁成功
01加锁失败
1

Logo

更多推荐