用 Redis 实现分布式锁——C++ 高并发编程必备指南
要说分布式系统里最让人头大的事儿,保证资源不被瞎抢肯定算一个!而 Redis 这货作为高性能的键值存储,在分布式锁这块可是老熟人了,用得特别广。咱都知道,分布式环境下想控住并发,那必须得靠分布式锁。今天就来唠唠,用 Redis 搞分布式锁时,有哪些坑得避开,哪些关键点得拿捏住。
要说分布式系统里最让人头大的事儿,保证资源不被瞎抢肯定算一个!而 Redis 这货作为高性能的键值存储,在分布式锁这块可是老熟人了,用得特别广。
咱都知道,分布式环境下想控住并发,那必须得靠分布式锁。今天就来唠唠,用 Redis 搞分布式锁时,有哪些坑得避开,哪些关键点得拿捏住。
Part1 啥是分布式锁?
说白了,分布式锁就是用来管着分布式系统里那些共享资源的。多台机器、多个节点要是同时想访问或改同一个资源,这锁就能站出来说 “停”,保证同一时间只有一个能操作,免得搞出数据乱序、不一致的烂摊子。简单说,就是防止大家 “抢同一盘菜” 时打架。
Part2 为啥非得用分布式锁?
跟分布式锁对着干的,是单机锁。咱写多线程程序时,怕多个线程瞎改共享变量,一把单机锁就能搞定 —— 但这锁只能管同一个进程里的线程,出了这个进程就不好使了。
可现在的业务不都搞微服务吗?
一个应用恨不得部署好多个进程。
举个例子哈:要是好几个进程都想改 MySQL 里的同一条记录,没个规矩管着,改来改去准出脏数据。
这时候就轮到分布式锁登场了 —— 它能跨进程、跨机器,让所有进程都得按规矩来 “申请锁”。
想搞分布式锁,得找个 “公证人”—— 所有进程都去它那申请锁,而且这 “公证人” 必须能做到:两个请求同时来,只能让一个成功拿到锁,另一个乖乖等着。这 “公证人” 可以是数据库,也可以是 Redis、Zookeeper,但论性能,Redis 和 Zookeeper 显然更靠谱,所以大家更爱用它们。
Redis 本身就能被多个客户端共享访问,天生适合存分布式锁;而且读写速度贼快,高并发场景下也扛得住。
Part3 分布式锁得有啥本事?
想让分布式锁靠谱,这几个特点必须拿捏住:
- 独占性:不管啥情况,同一时间只能有一个线程拿着锁,别人想抢也抢不到。
- 高可用:Redis 集群里哪怕某个节点挂了,加锁、解锁也不能掉链子 —— 总不能一个节点崩了,整个锁系统就歇菜了吧?
- 防死锁:必须有超时机制,或者能主动撤销。不然某个线程拿了锁后挂了,别人永远拿不到锁,这不就卡死了?
- 不乱抢:自己加的锁,只能自己释放,不能手贱去解别人的锁。
- 重入性:同一个线程可以多次加锁。比如一个线程拿了锁之后,进了个子程序还想加锁,这时候得让它能加上,不然自己卡自己就搞笑了。
Part4 分布式锁啥时候用?
只要程序想保证操作不出错,又有多个资源不能被多个进程同时用,就得靠它。
比如一个文件,总不能让好几个进程同时改吧?改着改着内容就乱了;
再比如打印机,同一时间只能给一个进程用,不然打出来的东西都混在一起了
这种必须 “一人用,其他人等” 的程序部分,叫 “临界区”。想让临界区里的操作不出岔子,分布式锁就是管这事的。
Part5 分布式锁有哪些搞法?
很多分布式锁都是靠 “分布式共识算法” 搞出来的,比如 Paxos、Raft 这些。常见的有这么几种:
- Chubby 分布式锁:谷歌搞的,跟 Zookeeper 有点像,但脾气不一样。它靠 “Sequencer 机制” 解决请求延迟导致的锁失效问题,属于粗粒度的锁服务。
- Zookeeper 分布式锁:靠 Zookeeper 的 “顺序临时节点” 搞事儿。Zookeeper 天生就为分布式应用服务,比如临时节点会自动删除,还有 Watch 机制 —— 加锁失败了就乖乖等着,跟本地锁似的方便,挺多人用。
- Consul 分布式锁:靠它的 Key/Value 存储里的 Acquire 和 Release 操作。简单说:
- Acquire:只有锁没人持有时,才能加锁成功,还会记录是谁拿的锁;不然就失败。
- Release:只有拿锁的 “人” 才能解锁,不是你拿的,想解也解不了。
- Redis 分布式锁:
- 单机版的话,靠 Redis 的 SETNX 命令 —— 这命令是原子操作,只有 Key 不存在的时候才能设成功,正好用来抢锁。
- 集群环境下,有个叫 Redlock 的玩法,是 Redis 作者 antirez 搞出来的规范,想让 Redis 分布式锁更安全靠谱。步骤大概是这样:
- 先看当前时间;
- 挨个给 N 个节点发请求加锁,每个节点都设个超时,别等太久;
- 算一下 “锁有效时间”= 锁本身的过期时间 - 加锁花的时间。如果超过一半的节点(N/2+1 个)都加锁成功,而且有效时间还大于 0,那就算抢锁成功;
- 要是没抢到,就赶紧给所有节点发消息解锁。
说白了就是:只要在锁过期前,超过一半的节点都认你拿了锁,那你就真拿到了 —— 这有点像 Zookeeper 选 leader 的路数。
Part6 分布式锁应用场景
Redis 分布式锁作为分布式系统中的核心协调机制,咱来瞅瞅哪些地方常用它:
- 秒杀抢购、优惠券领取:电商平台搞秒杀、发优惠券的时候,最怕的就是一堆人同时抢,结果东西超卖了,或者有人领了好几张。这时候一把分布式锁就能镇住场子,保证同一时间只有一个用户能操作成功,把这些糟心事挡在门外。
- 订单处理:电商系统不是都分布式部署嘛,用户下单前,得先拿个分布式锁,查查库存够不够。确认没问题了,才能让他下单,完事儿再把锁放了。这样就不会出现多个人同时下单,库存算错的情况。
- 实时统计:像统计在线人数、PV、UV 这些实时数据的时候,并发一高就容易乱套。用分布式锁把统计操作框住,能避免冲突,让数据统计得准准的。
- 任务调度:分布式系统里跑任务调度,要是多个任务之间得错开执行,不能撞车,就可以用分布式锁来管着 —— 同一时间只让一个任务跑起来,谁也别打扰谁。
- 分布式爬虫:好几台机器一起爬同一个网站,要是都往同一个资源使劲抓,很容易被封 IP,或者把人服务器搞崩了。这时候分布式锁就能发挥作用,让爬虫们错开,别扎堆儿抢同一东西。
- 消息队列幂等性:用消息队列的时候,最怕消息被重复处理,搞出乱子。分布式锁能帮忙把这事按住,确保同一条消息只被处理一次,实现幂等性。
Part7 Redis分布式锁常见坑点
Redis 分布式锁的坑,大多绕不开 “锁的租期”“身份验证”“可重入性” 这几个点,接下来我们具体来唠唠。
1、任务还没干完,锁先过期了
这情况虽说不常见,但架不住意外多啊!比如任务调用第三方接口卡壳了、数据库查询突然变龟速了,结果任务还没跑完,锁的 “租期” 就到了,直接 “下班” 了。这时候别的线程就可能趁虚而入,搞出数据乱子。
那咋对付这事儿呢?咱来唠唠几种思路:
1.1、暴论:直接把锁的过期时间设大点儿不行吗?
有人可能会说:“既然怕超时,一开始就把锁的过期时间设长点不就完了?” 想法是挺好,但问题在于 —— 任务执行时间这东西,咱说了不算啊!可能一万次都快得很,但就有一次因为外部接口卡了、服务器负载高了,超时了。咱总不能拍脑袋定个 “绝对够用” 的时间吧?而且不管是设长过期时间,还是后面续期,结果都是别的线程得等着,那为啥不直接设长点?其实关键在于 “不确定性”,任务执行时间可能忽快忽慢,硬设长了反而可能让锁 “占着茅坑不拉屎”,影响效率。
1.2、不设过期时间,任务不完锁不丢?
还有人想:“干脆不给锁设过期时间,任务啥时候干完,啥时候手动释放,这不就不会提前过期了?” 听着挺靠谱,正常情况下确实行 —— 就像用try-finally,加锁放try里,释放放finally里,稳得很。
但架不住极端情况啊!比如服务器突然断电了、程序被硬生生杀掉了、网络断了,这时候finally都没机会执行,锁就成了 “永久占坑” 的状态,别的线程永远拿不到锁,直接卡死。
那咋补救?可以这么干:加锁的时候先不给明确过期时间,但偷偷设个默认 “兜底租期”(比如 30 秒),同时启动个定时任务 “看门狗”。这 “看门狗” 每隔一段时间(比如 10 秒,也就是兜底租期的 1/3)就瞅一眼:如果锁还在,而且剩余时间不到兜底租期的一半了,就自动 “续租”,把租期再设成 30 秒。这样一来,只要任务还在跑,锁就一直续期;万一任务挂了,“看门狗” 也跟着歇菜,30 秒后锁自动过期,不耽误事儿。
1.3、设了过期时间,再加个 “续租” 机制
这方案其实跟上面的思路差不多,只不过一开始就给锁设个过期时间(比如 30 秒),同时启动 “看门狗” 定时续期。比如每隔 10 秒检查一次,锁快过期了就续到 30 秒。
这么做的好处是更符合咱的使用习惯 —— 锁总得有个 “租期” 才踏实。本质上和 1.2 的方案核心一样,都是靠 “续租” 保证任务没干完时锁不掉线,任务挂了锁也能自动过期。
综合来看,1.3 方案更靠谱,既符合咱们对锁的 “租期” 认知,又能应对各种意外超时的情况。
2、线程 A 瞎操心,把线程 B 的锁给释放了
有老铁可能担心:“万一线程 B 正拿着锁干活呢,线程 A 不知咋地把它的锁给删了,这不就乱套了?”
其实啊,这事儿大概率是自己吓自己 —— 说白了,要是线程 A 能释放线程 B 的锁,那说明你的分布式锁从一开始就写错了!
关键在哪儿?
给每个线程发个 “身份证”(请求 ID)就行。加锁的时候,把这个请求 ID 当成锁的值存起来;释放锁的时候,先查一下锁当前的值是不是自己的请求 ID,对上了才允许删,对不上就不搭理。就像你去开门,得先核对钥匙上的编号是不是自己家的,不是就别瞎拧。
3、线程自己把自己锁门外了(可重入性问题)
这情况也挺常见:比如一个线程拿到锁后,执行方法 A,结果方法 A 里又要申请同一把锁 —— 得,自己把自己堵门外了,这就是 “不可重入” 的坑。
举个例子:
你进了家门(拿到锁),想进卧室(调用方法 A),结果卧室门还得重新拿钥匙(再申请同一把锁),但钥匙已经在你手里了,却不让进,这不就离谱?
咋解决?两种思路:
- 简单点:改改代码,方法 A 里别用同一把锁,换个锁的 key 就行,相当于 “卧室门用另一把钥匙”。
- 彻底点:实现 “可重入”—— 还是用请求 ID,同一个线程第二次申请同一把锁时,先查一下锁的值是不是自己的请求 ID,是的话直接 “放行”,不用重新竞争。就像你拿着家门钥匙,进卧室直接推门就行,不用再找钥匙。
Part8 基于Redis的C++分布式锁实现
main.cpp
#include "redis_distributed_lock.h"
#include <iostream>
int main() {
try {
// 初始化分布式锁(连接Redis)
RedisDistributedLock locker("127.0.0.1", 6379);
const std::string lockKey = "my_resource_lock";
const std::string requestId = "client-123"; // 建议使用UUID确保唯一性
const int expireTimeMs = 10000; // 锁过期时间10秒
const int renewIntervalMs = 3000; // 每3秒续期一次
// 示例1:使用非重入锁
if (locker.tryLock(lockKey, requestId, expireTimeMs)) {
std::cout << "Non-reentrant lock acquired" << std::endl;
// 启动看门狗续期
if (locker.startWatchdog(lockKey, requestId, expireTimeMs, renewIntervalMs)) {
std::cout << "Watchdog started" << std::endl;
}
// 执行临界区操作(模拟业务逻辑)
std::this_thread::sleep_for(std::chrono::seconds(5));
// 释放锁
if (locker.unlock(lockKey, requestId)) {
std::cout << "Non-reentrant lock released" << std::endl;
}
// 停止看门狗
locker.stopWatchdog();
} else {
std::cout << "Failed to acquire non-reentrant lock" << std::endl;
}
// 示例2:使用可重入锁
if (locker.tryReentrantLock(lockKey, requestId, expireTimeMs)) {
std::cout << "First reentrant lock acquired" << std::endl;
// 重入一次
if (locker.tryReentrantLock(lockKey, requestId, expireTimeMs)) {
std::cout << "Second reentrant lock acquired" << std::endl;
// 执行临界区操作
std::this_thread::sleep_for(std::chrono::seconds(3));
// 第一次释放(重入次数-1,仍持有锁)
locker.unlockReentrant(lockKey, requestId);
std::cout << "First reentrant unlock (still holding lock)" << std::endl;
}
// 第二次释放(完全释放锁)
if (locker.unlockReentrant(lockKey, requestId)) {
std::cout << "Second reentrant unlock (lock released)" << std::endl;
}
} else {
std::cout << "Failed to acquire reentrant lock" << std::endl;
}
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
redis_distributed_lock.cpp
#include "redis_distributed_lock.h"
#include <cstdarg>
#include <stdexcept>
// 构造函数:初始化Redis连接
RedisDistributedLock::RedisDistributedLock(const std::string& host, int port, int connectTimeoutMs)
: context_(nullptr), watchdogRunning_(false) {
// 设置连接超时
struct timeval timeout = {connectTimeoutMs / 1000, (connectTimeoutMs % 1000) * 1000};
context_ = redisConnectWithTimeout(host.c_str(), port, timeout);
if (!checkAndReconnect()) {
throw std::runtime_error("Failed to initialize Redis connection");
}
}
// 析构函数:释放资源
RedisDistributedLock::~RedisDistributedLock() {
stopWatchdog(); // 确保看门狗线程退出
if (context_) {
redisFree(context_);
context_ = nullptr;
}
}
// 检查连接有效性,无效则尝试重连
bool RedisDistributedLock::checkAndReconnect() {
std::lock_guard<std::mutex> lock(contextMutex_);
if (context_ && !context_->err) {
return true; // 连接有效
}
// 连接无效,尝试重新初始化(假设构造时的host和port不变,实际可优化为成员变量)
// 注:实际应用中应将host和port保存为成员变量
if (context_) {
redisFree(context_);
context_ = nullptr;
}
// 此处简化处理,实际应从成员变量获取host和port
context_ = redisConnect("127.0.0.1", 6379);
if (!context_ || context_->err) {
std::cerr << "Redis reconnection failed: "
<< (context_ ? context_->errstr : "Out of memory") << std::endl;
return false;
}
return true;
}
// 执行Redis命令(线程安全)
redisReply* RedisDistributedLock::executeCommand(const char* format, ...) {
if (!checkAndReconnect()) {
return nullptr;
}
va_list args;
va_start(args, format);
std::lock_guard<std::mutex> lock(contextMutex_);
redisReply* reply = (redisReply*)redisvCommand(context_, format, args);
va_end(args);
return reply;
}
// 尝试获取锁(非重入)
bool RedisDistributedLock::tryLock(const std::string& lockKey, const std::string& requestId, int expireTimeMs) {
if (expireTimeMs <= 0) {
std::cerr << "Expire time must be positive" << std::endl;
return false;
}
// 使用SET NX PX实现原子加锁:仅当key不存在时设置,并指定过期时间
redisReply* reply = executeCommand(
"SET %s %s NX PX %d",
lockKey.c_str(),
requestId.c_str(),
expireTimeMs
);
if (!reply) {
std::cerr << "Failed to execute tryLock command" << std::endl;
return false;
}
// 加锁成功的标志是返回"OK"
bool result = (reply->type == REDIS_REPLY_STATUS &&
strcmp(reply->str, "OK") == 0);
freeReplyObject(reply);
return result;
}
// 释放锁(非重入)
bool RedisDistributedLock::unlock(const std::string& lockKey, const std::string& requestId) {
// Lua脚本:确保仅释放自己持有的锁(原子操作)
const char* luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then "
" return redis.call('del', KEYS[1]) "
"else "
" return 0 "
"end";
redisReply* reply = executeCommand(
"EVAL %s 1 %s %s",
luaScript,
lockKey.c_str(),
requestId.c_str()
);
if (!reply) {
std::cerr << "Failed to execute unlock script" << std::endl;
return false;
}
// 释放成功返回1,否则返回0
bool result = (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
freeReplyObject(reply);
return result;
}
// 尝试获取可重入锁
bool RedisDistributedLock::tryReentrantLock(const std::string& lockKey, const std::string& requestId, int expireTimeMs) {
if (expireTimeMs <= 0) {
std::cerr << "Expire time must be positive" << std::endl;
return false;
}
// Lua脚本:使用哈希表存储重入次数
const char* luaScript =
"local current = redis.call('HGET', KEYS[1], ARGV[1])\n"
"if current then\n"
" -- 已持有锁,重入次数+1\n"
" redis.call('HINCRBY', KEYS[1], ARGV[1], 1)\n"
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n"
" return 1\n"
"else\n"
" -- 未持有锁,尝试获取(检查是否有其他持有者)\n"
" local holders = redis.call('HLEN', KEYS[1])\n"
" if holders == 0 then\n"
" redis.call('HSET', KEYS[1], ARGV[1], 1)\n"
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n"
" return 1\n"
" else\n"
" return 0\n"
" end\n"
"end";
redisReply* reply = executeCommand(
"EVAL %s 1 %s %s %d",
luaScript,
lockKey.c_str(),
requestId.c_str(),
expireTimeMs
);
if (!reply) {
std::cerr << "Failed to execute reentrant lock script" << std::endl;
return false;
}
bool result = (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
freeReplyObject(reply);
return result;
}
// 释放可重入锁(需与加锁次数匹配)
bool RedisDistributedLock::unlockReentrant(const std::string& lockKey, const std::string& requestId) {
// Lua脚本:减少重入次数,次数为0时删除锁
const char* luaScript =
"local current = redis.call('HGET', KEYS[1], ARGV[1])\n"
"if not current then\n"
" return -1 -- 未持有锁\n"
"end\n"
"local count = tonumber(current) - 1\n"
"if count == 0 then\n"
" redis.call('HDEL', KEYS[1], ARGV[1])\n"
" -- 若哈希表为空,删除整个key\n"
" if redis.call('HLEN', KEYS[1]) == 0 then\n"
" redis.call('DEL', KEYS[1])\n"
" end\n"
" return 1 -- 完全释放\n"
"else\n"
" redis.call('HSET', KEYS[1], ARGV[1], count)\n"
" return 0 -- 部分释放(仍持有锁)\n"
"end";
redisReply* reply = executeCommand(
"EVAL %s 1 %s %s",
luaScript,
lockKey.c_str(),
requestId.c_str()
);
if (!reply) {
std::cerr << "Failed to execute reentrant unlock script" << std::endl;
return false;
}
// 返回1表示完全释放,0表示部分释放,-1表示未持有锁
bool result = (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
freeReplyObject(reply);
return result;
}
// 启动看门狗自动续期
bool RedisDistributedLock::startWatchdog(const std::string& lockKey, const std::string& requestId,
int expireTimeMs, int renewIntervalMs) {
if (watchdogRunning_.load()) {
std::cerr << "Watchdog is already running" << std::endl;
return false;
}
if (renewIntervalMs <= 0 || expireTimeMs <= renewIntervalMs) {
std::cerr << "Invalid renew interval or expire time" << std::endl;
return false;
}
watchdogRunning_.store(true);
watchdogThread_ = std::thread([=]() {
while (watchdogRunning_.load()) {
// 等待续期间隔(避免频繁续期)
std::this_thread::sleep_for(std::chrono::milliseconds(renewIntervalMs));
// Lua脚本:仅当锁仍被当前请求持有,才续期
const char* luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then "
" return redis.call('PEXPIRE', KEYS[1], ARGV[2]) "
"else "
" return 0 "
"end";
// 重入锁的续期脚本(使用哈希表)
const char* reentrantScript =
"if redis.call('HGET', KEYS[1], ARGV[1]) then "
" return redis.call('PEXPIRE', KEYS[1], ARGV[2]) "
"else "
" return 0 "
"end";
// 执行续期命令(根据锁类型选择脚本,此处简化为普通锁)
redisReply* reply = executeCommand(
"EVAL %s 1 %s %s %d",
luaScript,
lockKey.c_str(),
requestId.c_str(),
expireTimeMs // 续期到原始过期时间,而非间隔时间
);
if (reply) {
freeReplyObject(reply);
} else {
std::cerr << "Watchdog renew failed" << std::endl;
}
}
});
return true;
}
// 停止看门狗线程
void RedisDistributedLock::stopWatchdog() {
if (watchdogRunning_.load()) {
watchdogRunning_.store(false);
if (watchdogThread_.joinable()) {
watchdogThread_.join(); // 等待线程退出,避免资源泄漏
}
}
}
redis_distributed_lock.h
#include <hiredis/hiredis.h>
#include <string>
#include <thread>
#include <chrono>
#include <iostream>
#include <functional>
#include <mutex>
#include <atomic>
// 分布式锁类,封装Redis分布式锁功能
class RedisDistributedLock {
public:
// 构造函数:初始化Redis连接
RedisDistributedLock(const std::string& host, int port, int connectTimeoutMs = 5000);
// 析构函数:释放资源
~RedisDistributedLock();
// 禁用拷贝构造和赋值(避免Redis连接重复释放)
RedisDistributedLock(const RedisDistributedLock&) = delete;
RedisDistributedLock& operator=(const RedisDistributedLock&) = delete;
// 尝试获取锁(非重入)
// 返回true表示获取成功,false表示失败
bool tryLock(const std::string& lockKey, const std::string& requestId, int expireTimeMs);
// 释放锁(非重入)
bool unlock(const std::string& lockKey, const std::string& requestId);
// 尝试获取可重入锁
bool tryReentrantLock(const std::string& lockKey, const std::string& requestId, int expireTimeMs);
// 释放可重入锁(需与加锁次数匹配)
bool unlockReentrant(const std::string& lockKey, const std::string& requestId);
// 启动看门狗自动续期(需在获取锁后调用)
// 返回true表示启动成功
bool startWatchdog(const std::string& lockKey, const std::string& requestId, int expireTimeMs, int renewIntervalMs);
// 停止看门狗
void stopWatchdog();
private:
redisContext* context_; // Redis连接上下文
std::thread watchdogThread_; // 看门狗线程
std::atomic<bool> watchdogRunning_; // 看门狗运行标志
std::mutex contextMutex_; // 保护Redis连接的互斥锁
// 执行Redis命令(带锁保护)
redisReply* executeCommand(const char* format, ...);
// 检查Redis连接是否有效,无效则尝试重连
bool checkAndReconnect();
};
往期推荐
B站 C++ 一面:互斥锁和自旋锁的区别,使用场景分别是什么?
小米C++校招二面:epoll和poll还有select区别,底层方式?
顺时针螺旋移动法 | 彻底弄懂复杂C/C++嵌套声明、const常量声明!!!
更多推荐


所有评论(0)