Redis分布式锁实现

下面是一个基于Redis的分布式锁实现,封装成一个类,默认10分钟自动释放锁。

import time
import uuid
import redis
from functools import wraps


class RedisDistributedLock:
    def __init__(self, redis_client, lock_name, expire_time=600):
        """
        初始化分布式锁
        
        :param redis_client: Redis客户端实例
        :param lock_name: 锁的名称
        :param expire_time: 锁的过期时间(秒),默认10分钟(600秒)
        """
        self.redis_client = redis_client
        self.lock_name = lock_name
        self.expire_time = expire_time
        self.identifier = str(uuid.uuid4())  # 唯一标识,用于释放锁时验证
    
    def acquire(self, blocking=True, timeout=None):
        """
        获取锁
        
        :param blocking: 是否阻塞等待
        :param timeout: 阻塞等待的超时时间(秒)
        :return: 是否成功获取锁
        """
        if blocking:
            if timeout is None:
                # 无限等待
                while True:
                    if self._try_acquire():
                        return True
                    time.sleep(0.001)
            else:
                # 有限等待
                end_time = time.time() + timeout
                while time.time() < end_time:
                    if self._try_acquire():
                        return True
                    time.sleep(0.001)
                return False
        else:
            # 非阻塞模式,尝试一次
            return self._try_acquire()
    
    def _try_acquire(self):
        """
        尝试获取锁的内部方法
        """
        # 使用SET命令的NX和EX参数实现原子操作
        result = self.redis_client.set(
            self.lock_name, 
            self.identifier, 
            ex=self.expire_time, 
            nx=True
        )
        return result is True
    
    def release(self):
        """
        释放锁
        
        使用Lua脚本确保原子性操作:
        1. 检查锁的值是否匹配
        2. 如果匹配则删除键
        """
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return self.redis_client.eval(lua_script, 1, self.lock_name, self.identifier)
    
    def __enter__(self):
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()


def distributed_lock(redis_client, lock_name, expire_time=600):
    """
    分布式锁装饰器
    
    :param redis_client: Redis客户端实例
    :param lock_name: 锁的名称
    :param expire_time: 锁的过期时间(秒),默认10分钟(600秒)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            lock = RedisDistributedLock(redis_client, lock_name, expire_time)
            try:
                with lock:
                    return func(*args, **kwargs)
            except Exception as e:
                raise e
        return wrapper
    return decorator

调用示例

方式1:使用上下文管理器

import redis

# 创建Redis客户端
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 使用上下文管理器
lock_name = "order:update:123"
try:
    with RedisDistributedLock(redis_client, lock_name) as lock:
        # 在这里执行需要加锁的操作
        print("成功获取锁,执行关键代码...")
        # 模拟业务操作
        time.sleep(5)
        print("操作完成")
except Exception as e:
    print(f"发生错误: {e}")

方式2:使用装饰器

@distributed_lock(redis_client, "user:update:456")
def update_user_info(user_id, new_info):
    # 这个函数会自动加锁
    print(f"更新用户 {user_id} 的信息: {new_info}")
    # 模拟耗时操作
    time.sleep(3)
    print("更新完成")

# 调用被装饰的函数
update_user_info(456, {"name": "张三", "age": 30})

方式3:手动获取和释放锁

lock = RedisDistributedLock(redis_client, "product:inventory:789")

# 尝试获取锁,等待最多5秒
if lock.acquire(blocking=True, timeout=5):
    try:
        print("成功获取锁,执行业务逻辑...")
        # 模拟业务操作
        time.sleep(2)
    finally:
        # 确保锁被释放
        lock.release()
else:
    print("获取锁失败,可能有其他进程持有锁")

实现说明

  1. 唯一标识:使用UUID作为锁的值,确保只有锁的持有者才能释放锁
  2. 原子性操作
    • 获取锁使用SET命令的NX和EX参数实现原子操作
    • 释放锁使用Lua脚本确保检查值和删除键的原子性
  3. 自动释放:默认10分钟(600秒)后锁会自动释放,防止死锁
  4. 多种使用方式:支持上下文管理器、装饰器和手动获取/释放三种使用方式
  5. 阻塞/非阻塞:支持阻塞和非阻塞两种获取锁的方式

这个实现遵循了Redis官方推荐的分布式锁模式(Redlock算法的一部分),能够满足大多数分布式锁的需求。

Logo

更多推荐