首页IT科技redisson分布式锁问题(Redisson源码解读-分布式锁)

redisson分布式锁问题(Redisson源码解读-分布式锁)

时间2025-05-05 01:11:27分类IT科技浏览3810
导读:前言 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。Redisson有一样功能是可重入的分布式锁。本文来讨论一下这个功能的特点以及源码分析。...

前言

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)            。Redisson有一样功能是可重入的分布式锁                   。本文来讨论一下这个功能的特点以及源码分析      。

前置知识

在讲Redisson             ,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制                  ,磨刀不误砍柴工         。

分布式锁的思考

首先思考下      ,如果我们自己去实现一个分布式锁          ,这个锁需要具备哪些功能?

互斥(这是一个锁最基本的功能) 锁失效机制(也就是可以设置锁定时长                   ,防止死锁) 高性能             、高可用 阻塞                  、非阻塞 可重入      、公平锁                    。          。      。

可见         ,实现一个分布式锁      ,需要考虑的东西有很多                  。那么                   ,如果用Redis来实现分布式锁呢?如果只需要具备上面说的1          、2点功能            ,要怎么写?(ps:我就不写了   ,自己想去)

Redis订阅/发布机制

Redisson中用到了Redis的订阅/发布机制                   ,下面简单介绍下             。

简单来说就是如果client2                   、 client5 和 client1 订阅了 channel1               ,当有消息发布到 channel1 的时候,client2          、 client5 和 client1 都会收到这个消息   。

图片来自 菜鸟教程-Redis发布订阅

Redisson

源码版本:3.17.7

下面以Redisson官方的可重入同步锁例子为入口                ,解读下源码                  。

RLock lock = redisson.getLock("anyLock"); // 尝试加锁                  ,最多等待100秒   ,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }

加锁

我用时序图来表示加锁和订阅的过程                。时序图中括号后面的c1      、c2代表client1             ,client2

当线程2获取了锁但还没释放锁时                  ,如果线程1去获取锁      ,会阻塞等待          ,直到线程2解锁                   ,通过Redis的发布订阅机制唤醒线程1,再次去获取锁。

加锁方法是 lock.tryLock(100, 10, TimeUnit.SECONDS)         ,对应着就是RedissonLock#tryLock

/** * 获取锁 * @param waitTime 尝试获取锁的最大等待时间      ,超过这个值                   ,则认为获取锁失败 * @param leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间            ,确保在锁有效期内业务能处理完) * @param unit 时间单位 * @return 获取锁成功返回true   ,失败返回false */ @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis();// 当前时间 long threadId = Thread.currentThread().getId();// 当前线程id // 尝试加锁                   ,加锁成功返回null               ,失败返回锁的剩余超时时间 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 获取锁成功 if (ttl == null) { return true; } // time小于0代表此时已经超过获取锁的等待时间,直接返回false time -= System.currentTimeMillis() - current; if (time <= 0) { // 没看懂这个方法                ,里面里面空运行                  ,有知道的大神还请不吝赐教 acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { if (!subscribeFuture.cancel(false)) { subscribeFuture.whenComplete((res, ex) -> { // 出现异常   ,取消订阅 if (ex == null) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false; } try { // 判断是否超时(超过了waitTime) time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } while (true) { // 再次获取锁             ,成功则返回 long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 阻塞等待信号量唤醒或者超时                  ,接收到订阅时唤醒 // 使用的是Semaphore#tryAcquire() currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { // 因为是同步操作      ,所以无论加锁成功或失败          ,都取消订阅 unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }

先看一下整体逻辑:

尝试加锁                   ,成功直接返回true 判断超时 订阅 判断超时 循环 ( 尝试获取锁 → 判断超时 → 阻塞等待 )

tryLock方法看着很长         ,但是有很多代码都是重复的      ,本小节重点说一下尝试加锁的方法tryAcquire

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime > 0) { // 调用lua脚本                   ,尝试加锁 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 这里的if                   、else的区别就在于            ,如果没有设置leaseTime   ,就使用默认的internalLockLeaseTime(默认30秒) ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired // 如果ttlRemaining为空                   ,也就是tryLockInnerAsync方法中的lua执行结果返回空               ,证明获取锁成功 if (ttlRemaining == null) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 如果没有设置锁的持有时间(leaseTime),则启动看门狗                ,定时给锁续期                  ,防止业务逻辑未执行完成锁就过期了 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }

在tryAcquireAsync方法中   ,主要分为两段逻辑:

调用lua脚本加锁:tryLockInnerAsync 看门狗:scheduleExpirationRenewal

看门狗在后面讲             ,本小节重点还是在加锁

// RedissonLock#tryLockInnerAsync <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call(exists, KEYS[1]) == 0) then " + "redis.call(hincrby, KEYS[1], ARGV[2], 1); " + "redis.call(pexpire, KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call(hexists, KEYS[1], ARGV[2]) == 1) then " + "redis.call(hincrby, KEYS[1], ARGV[2], 1); " + "redis.call(pexpire, KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call(pttl, KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }

Redisson使用了 Hash 结构来表示一个锁                  ,这样 Hash 里面的 key 为线程id      ,value 为锁的次数               。这样巧妙地解决了可重入锁的问题                   。

下面我们来分析下这段 lua 脚本的逻辑(下面说的threadId都是指变量          ,不是说key就叫’threadId’):

如果锁(hash结构)不存在                   ,则创建         ,并添加一个键值对 (threadId : 1)      ,并设置锁的过期时间 如果锁存在                   ,则将键值对 threadId 对应的值 + 1            ,并设置锁的过期时间 如果不如何1,2点   ,则返回锁的剩余过期时间

订阅

让我们把视线重新回到RedissonLock#tryLock中                   ,当经过一些尝试获取锁               ,超时判断之后,代码来到while循环中   。这个while循环是个死循环                ,只有成功获取锁或者超时                  ,才会退出            。一般死循环的设计中   ,都会有阻塞等待的代码             ,否则如果循环中的逻辑短时间拿不到结果                  ,会造成资源抢占和浪费                   。阻塞代码就是下面这段

if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); }

commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一个Semaphore信号量对象      ,这是jdk的内置对象          ,Semaphore#tryAcquire表示阻塞并等待唤醒      。那么信号量什么时候被唤醒呢?在订阅方法中RedissonLock#subscribe         。订阅方法的逻辑也不少                   ,咱们直接讲其最终调用的处理方法

// LockPubSub#onMessage protected void onMessage(RedissonLockEntry value, Long message) { // 普通的解锁走的是这个 if (message.equals(UNLOCK_MESSAGE)) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } // 这里就是唤醒信号量的地方 value.getLatch().release(); // 这个是读写锁用的 } else if (message.equals(READ_UNLOCK_MESSAGE)) { while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); } }

value.getLatch().release() 也就是Semaphore#release          ,会唤醒Semaphore#tryAcquire阻塞的线程

解锁

上面我们聊了加锁      ,本小节来聊下解锁                   。调用路径如下

// RedissonLock#unlock // RedissonBaseLock#unlockAsync(long threadId) public RFuture<Void> unlockAsync(long threadId) { // 调用lua解锁 RFuture<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { // 取消看门狗 cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f); }

解锁的逻辑不复杂                   ,调用lua脚本解锁以及取消看门狗          。看门狗晚点说            ,先说下lua解锁

// RedissonLock#unlockInnerAsync protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call(hexists, KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call(hincrby, KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call(pexpire, KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call(del, KEYS[1]); " + "redis.call(publish, KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }

老规矩   ,分析下这段lua:

如果锁不存在                   ,返回null 锁的值减1               ,如果锁的值大于0(也就是可重入锁仍然有加锁次数),则重新设置过期时间 如果锁的值小于等于0                ,这说明可以真正解锁了                  ,删除锁并通过发布订阅机制发布解锁消息

从 lua 中可以看到   ,解锁时会发布消息到 channel 中             ,加锁方法RedissonLock#tryLock中有相对应的订阅操作      。

看门狗

试想一个场景:程序执行需要10秒                  ,程序执行完成才去解锁      ,而锁的存活时间只有5秒          ,也就是程序执行到一半的时候锁就可以被其他程序获取了                   ,这显然不合适                  。那么怎么解决呢?

方式一:锁永远存在         ,直到解锁             。不设置存活时间   。

这种方法的弊端在于      ,如果程序没解锁就挂了                   ,锁就成了死锁

方式二:依然设置锁存活时间            ,但是监控程序的执行   ,如果程序还没有执行完成                   ,则定期给锁续期                  。

方式二就是Redisson的看门狗机制                。看门狗只有在没有显示指定锁的持有时间(leaseTime)时才会生效。

// RedissonLock#tryAcquireAsync // RedissonBaseLock#scheduleExpirationRenewal protected void scheduleExpirationRenewal(long threadId) { // 创建ExpirationEntry               ,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法会从map中再拿出来用 ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { // 看门狗的具体逻辑 renewExpiration(); } finally { // 如果线程被中断了                ,就取消看门狗 if (Thread.currentThread().isInterrupted()) { // 取消看门狗 cancelExpirationRenewal(threadId); } } } }

scheduleExpirationRenewal 方法处理了ExpirationEntry和如果出现异常则取消看门狗                  ,具体看门狗逻辑在 renewExpiration 方法中

private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 创建延时任务   ,延时时间是internalLockLeaseTime / 3 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // lua脚本判断             ,如果锁存在                  ,则续期并返回true      ,不存在则返回false CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Cant update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // 锁续期成功          ,则再启动一个延时任务                   ,继续监测 renewExpiration(); } else { // 取消看门狗 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }

Timeout 是一个延时任务         ,延时 internalLockLeaseTime / 3 时间执行               。任务的内容主要是通过 renewExpirationAsync 方法对锁进行续期      ,如果续期失败(解锁了            、锁到期等)                   ,则取消看门狗            ,如果续期成功   ,则递归 renewExpiration 方法                   ,继续创建延时任务                   。

internalLockLeaseTime 也就是 lockWatchdogTimeout 参数               ,默认是 30 秒   。

总结

本文介绍了Redisson的加锁   、解锁                   、看门狗机制,以及对Redis发布订阅机制的应用            。因为篇幅有限                ,很多细节聊得不够深入                   。此外Redisson的异步机制               、对Netty的使用等都是很值得水文章的      。

参考资料

万字长文带你解读Redisson分布式锁的源码 - 知乎 (zhihu.com)

Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
win11最新功能(Windows11新功能是什么 win11系统新功能详解) 深拷贝和浅拷贝的区别及实现方式(死磕Java面试系列:深拷贝与浅拷贝的实现原理)