首页IT科技synchronized 公平锁(Redisson源码解读-公平锁)

synchronized 公平锁(Redisson源码解读-公平锁)

时间2025-06-13 20:47:34分类IT科技浏览7058
导读:前言 我在上一篇文章聊了Redisson的可重入锁,这次继续来聊聊Redisson的公平锁。下面是官方原话:...

前言

我在上一篇文章聊了Redisson的可重入锁              ,这次继续来聊聊Redisson的公平锁              。下面是官方原话:

它保证了当多个Redisson客户端线程同时请求加锁时                    ,优先分配给先发出请求的线程                      。所有请求线程会在一个队列中排队        ,当某个线程出现宕机时           ,Redisson会等待5秒后继续下一个线程                   ,也就是说如果前面有5个线程都处于等待状态            ,那么后面的线程会等待至少25秒      。

源码版本:3.17.7

这是我 fork 的分支        ,添加了自己理解的中文注释:https://github.com/xiaoguyu/redisson

公平锁

先上官方例子:

RLock fairLock = redisson.getFairLock("anyLock"); // 尝试加锁                   ,最多等待100秒               ,上锁以后10秒自动解锁 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();

因为在Redisson中    ,公平锁和普通可重入锁的逻辑大体上一样                    ,我在上一篇文章都介绍了                  ,这里就不再赘述          。下面开始介绍合理逻辑                       。

加锁

加锁的 lua 脚本在 RedissonFairLock#tryLockInnerAsync方法中

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { long wait = threadWaitTime; if (waitTime > 0) { wait = unit.toMillis(waitTime); } long currentTime = System.currentTimeMillis(); if (command == RedisCommands.EVAL_NULL_BOOLEAN) { ...... } if (command == RedisCommands.EVAL_LONG) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // remove stale threads "while true do " + // list为空,证明没有人排队                 ,退出循环 "local firstThreadId2 = redis.call(lindex, KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end;" + // 能到这里                     ,证明有人排队    ,拿出在排队的第一个人的超时时间              ,如果超时了                    ,则移除相应数据 "local timeout = tonumber(redis.call(zscore, KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[4]) then " + // remove the item from the queue and timeout set // NOTE we do not alter any other timeout "redis.call(zrem, KEYS[3], firstThreadId2);" + "redis.call(lpop, KEYS[2]);" + "else " + "break;" + "end;" + "end;" + // check if the lock can be acquired now // 检查是否可以获取锁         。如果hash和list都不存在        ,或者线程队列的第一个是当前线程           ,则可以获取锁 "if (redis.call(exists, KEYS[1]) == 0) " + "and ((redis.call(exists, KEYS[2]) == 0) " + "or (redis.call(lindex, KEYS[2], 0) == ARGV[2])) then " + // remove this thread from the queue and timeout set // 都获取锁了                   ,当然要从线程队列和时间队列中移除 "redis.call(lpop, KEYS[2]);" + "redis.call(zrem, KEYS[3], ARGV[2]);" + // decrease timeouts for all waiting in the queue // 刷新时间集合中的时间 "local keys = redis.call(zrange, KEYS[3], 0, -1);" + "for i = 1, #keys, 1 do " + "redis.call(zincrby, KEYS[3], -tonumber(ARGV[3]), keys[i]);" + "end;" + // acquire the lock and set the TTL for the lease // 和公平锁的设置一样            ,值加1并且设置过期时间 "redis.call(hset, KEYS[1], ARGV[2], 1);" + "redis.call(pexpire, KEYS[1], ARGV[1]);" + "return nil;" + "end;" + // check if the lock is already held, and this is a re-entry // 能到这里        ,证明前面拿不到锁                   ,但是也要做可重入锁的处理 "if redis.call(hexists, KEYS[1], ARGV[2]) == 1 then " + "redis.call(hincrby, KEYS[1], ARGV[2],1);" + "redis.call(pexpire, KEYS[1], ARGV[1]);" + "return nil;" + "end;" + // the lock cannot be acquired // check if the thread is already in the queue // 时间集合中有值               ,证明线程已经在队列中    ,不需要往后执行逻辑了 "local timeout = redis.call(zscore, KEYS[3], ARGV[2]);" + "if timeout ~= false then " + // the real timeout is the timeout of the prior thread // in the queue, but this is approximately correct, and // avoids having to traverse the queue // 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]) // 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]) "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" + "end;" + // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the // threadWaitTime "local lastThreadId = redis.call(lindex, KEYS[2], -1);" + "local ttl;" + // 如果最后一个线程不是当前线程                    ,则从时间集合取出(举例:线程1/2/3按顺序获取锁                  ,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间                 ,此时线程3应该以线程2的为准) "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " + "ttl = tonumber(redis.call(zscore, KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" + "else " + // 否则直接获取锁的存活时间 "ttl = redis.call(pttl, KEYS[1]);" + "end;" + // 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳 "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" + // 如果添加到时间集合成功                     ,则同时添加线程集合 "if redis.call(zadd, KEYS[3], timeout, ARGV[2]) == 1 then " + "redis.call(rpush, KEYS[2], ARGV[2]);" + "end;" + "return ttl;", Arrays.asList(getRawName(), threadsQueueName, timeoutSetName), unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime); } throw new IllegalArgumentException(); }

公平锁总共用了Redis的三种数据类型    ,对应着 lua 脚本里面的keys1              、2                      、3的参数:

KEYS[1]

锁的名字              ,使用 Hash 数据类型                    ,是可重入锁的基础        ,结构为 {              ”threadId1                      ”: 1, “thread2      ”: 1}           ,key为线程id                   ,value是锁的次数

KEYS[2]

线程队列的名字            ,使用 List 数据类型        ,结构为 [ “threadId1          ”, “threadId2                       ” ]                   ,按顺序存放需要获取锁的线程的id

KEYS[3]

时间队列的名字               ,使用 sorted set 数据类型    ,结构为 {         ”threadId2      ”:123, “threadId1                       ”:190}                    ,key为线程id                  ,value为获取锁的超时时间戳

我下面会用 锁      、线程队列          、时间队列 来表示这3个数据结构,需要注意下我的表述      。

同样的                 ,介绍下参数:

ARGV[1]:leaseTime 锁的持有时间 ARGV[2]:线程id(描述不太准确                     ,暂时按这样理解) ARGV[3]:waitTime 尝试获取锁的最大等待时间 ARGV[4]:currentTime 当前时间戳

接下来    ,我们一段一段分析 lua 脚本              ,首先看最开始的 while 循环

"while true do " + // list为空                    ,证明没有人排队        ,退出循环 "local firstThreadId2 = redis.call(lindex, KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end;" + // 能到这里           ,证明有人排队                   ,拿出在排队的第一个人的超时时间            ,如果超时了        ,则移除相应数据 "local timeout = tonumber(redis.call(zscore, KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[4]) then " + // 从时间队列和线程队列中移除 "redis.call(zrem, KEYS[3], firstThreadId2);" + "redis.call(lpop, KEYS[2]);" + "else " + "break;" + "end;" + "end;" +

具体的逻辑我在注释中写的很清楚了                   ,看的时候记住 KEYS[2]                       、KEYS[3] 对应着线程队列和时间队列接口                       。主要注意的是               ,线程队列只有当一个线程持有锁    ,另一个线程获取不到锁时                    ,才会有值(前面有人才排队                  ,没人排什么队)             。接着看第二段

// 检查是否可以获取锁   。当锁不存在,并且线程队列不存在或者线程队列第一位是当前线程                 ,则可以获取锁 "if (redis.call(exists, KEYS[1]) == 0) " + "and ((redis.call(exists, KEYS[2]) == 0) or (redis.call(lindex, KEYS[2], 0) == ARGV[2])) then " + // remove this thread from the queue and timeout set // 都获取锁了                     ,当然要从线程队列和时间队列中移除 "redis.call(lpop, KEYS[2]);" + "redis.call(zrem, KEYS[3], ARGV[2]);" + // decrease timeouts for all waiting in the queue // 刷新时间队列中的时间 "local keys = redis.call(zrange, KEYS[3], 0, -1);" + "for i = 1, #keys, 1 do " + "redis.call(zincrby, KEYS[3], -tonumber(ARGV[3]), keys[i]);" + "end;" + // acquire the lock and set the TTL for the lease // 和公平锁的设置一样    ,值加1并且设置过期时间 "redis.call(hset, KEYS[1], ARGV[2], 1);" + "redis.call(pexpire, KEYS[1], ARGV[1]);" + "return nil;" + "end;" +

翻译翻译就是              ,锁不存在(别人没有持有锁)并且线程队列不存在或者线程队列第一位是当前线程(不用排队或者自己排第一)才能获得锁                      。因为时间队列中存放的是各个线程等待锁的超时时间戳                    ,所以每次都需要刷新下                 。继续下一段逻辑

// 能到这里        ,证明前面拿不到锁           ,但是也要做可重入锁的处理 "if redis.call(hexists, KEYS[1], ARGV[2]) == 1 then " + "redis.call(hincrby, KEYS[1], ARGV[2],1);" + "redis.call(pexpire, KEYS[1], ARGV[1]);" + "return nil;" + "end;" +

这是可重入锁的处理                   ,继续下一段

// 时间队列中有值            ,证明线程已经在队列中        ,不需要往后执行逻辑了 "local timeout = redis.call(zscore, KEYS[3], ARGV[2]);" + "if timeout ~= false then " + // the real timeout is the timeout of the prior thread // in the queue, but this is approximately correct, and // avoids having to traverse the queue // 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]) // 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]) "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" + "end;" +

举例子:线程1持有锁                   ,线程2尝试第一次获取锁(不进入这段if)               ,线程2第二次获取锁(进入了这段if)。继续下一段

"local lastThreadId = redis.call(lindex, KEYS[2], -1);" + "local ttl;" + // 如果最后一个线程不是当前线程    ,则从时间集合取出(举例:线程1/2/3按顺序获取锁                    ,此时pttl得到的是线程1的锁过期时间                  ,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准) "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " + "ttl = tonumber(redis.call(zscore, KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" + "else " + // 否则直接获取锁的存活时间 "ttl = redis.call(pttl, KEYS[1]);" + "end;" + // 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳 "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" + // 如果添加到时间集合成功                 ,则同时添加线程集合 "if redis.call(zadd, KEYS[3], timeout, ARGV[2]) == 1 then " + "redis.call(rpush, KEYS[2], ARGV[2]);" + "end;" + "return ttl;",

ttl 这段的获取逻辑                     ,翻译翻译就是    ,如果前面有人排队              ,就以前面的超时时间为准                    ,如果没人排队        ,就拿锁的超时时间                  。获取到 ttl            ,就对添加到线程集合和时间集合                     。

以上就是公平锁的加锁 lua 脚本的全部逻辑   。讲的有点乱                   ,但是只要能搞清楚keys1         、2      、3对应着哪种数据类型            ,理解整个逻辑应该问题不大              。

解锁

解锁的核心 lua 脚本是下面这段RedissonFairLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads "while true do " // 线程队列为空        ,证明没有人排队                   ,退出循环 + "local firstThreadId2 = redis.call(lindex, KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end; " // 能到这里               ,证明有人排队    ,拿出在排队的第一个人的超时时间                    ,如果超时了                  ,则移除相应数据 + "local timeout = tonumber(redis.call(zscore, KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[4]) then " + "redis.call(zrem, KEYS[3], firstThreadId2); " + "redis.call(lpop, KEYS[2]); " + "else " + "break;" + "end; " + "end;" // 如果锁不存在,则通过订阅发布机制通知下一个等待中的线程 + "if (redis.call(exists, KEYS[1]) == 0) then " + "local nextThreadId = redis.call(lindex, KEYS[2], 0); " + "if nextThreadId ~= false then " + "redis.call(publish, KEYS[4] .. : .. nextThreadId, ARGV[1]); " + "end; " + "return 1; " + "end;" + // 如果当前线程已经不存在锁里面                 ,直接返回null "if (redis.call(hexists, KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 可重入锁处理逻辑                     ,对当前线程的锁次数减1 "local counter = redis.call(hincrby, KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + // 锁次数仍然大于0    ,则刷新锁的存活时间 "redis.call(pexpire, KEYS[1], ARGV[2]); " + "return 0; " + "end; " + // 删除锁 "redis.call(del, KEYS[1]); " + // 订阅发布机制通知下一个等待中的线程 "local nextThreadId = redis.call(lindex, KEYS[2], 0); " + "if nextThreadId ~= false then " + "redis.call(publish, KEYS[4] .. : .. nextThreadId, ARGV[1]); " + "end; " + "return 1; ", Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); }

算了              ,不想写了                    ,看注释吧                      。

总结

本文介绍了Redisson的公平锁        ,逻辑大体上和普通可重入锁一致           ,核心在于 lua 脚本                   ,运用了Redis的3种数据类型      。

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
利用网络可以做什么事情用网络可以做哪些任务-腾讯SkillNet | NLU任务全能网络,对Pathways架构的初步尝试 新丰简介(新丰是啥)