Redisson深度解析:高效分布式锁解决方案详解

创始人
2024-12-14 22:38:39
0 次浏览
0 评论

最强分布式锁工具:Redisson

Redisson概述

Redisson是一个基于Redis的Java内存数据网格。
它提供了一系列通用的分布式Java对象和分布式服务,旨在简化分布式操作,提高开发效率。

Redisson不仅提供基本的分布式对象,还提供高级的分布式服务,可以解决许多常见的分布式问题。

Redisson与Jedis和Lettuce的区别在于,它提供了更高层次的抽象,并且更容易使用Redis。
Jedis和Lettuce主要提供Redis命令的封装,而Redisson则基于Redis、Lua和Netty构建成熟的分布式解决方案。
官方甚至推荐使用Redisson作为Redis工具集。

分布式锁

分布式锁是并发业务中的关键组件,用于保证多个进程或线程对共享资源的互斥访问。
分布式锁的实现通常包括顺序节点、表级锁、悲观锁、乐观锁以及Redis的setNx命令。

Redisson提供了简单的实现方法,通过Lua脚本实现原子操作,保证锁获取和释放过程的原子性,从而避免并发问题。
Lua脚本通过Rediseval命令执行,提供一次性原子操作,保证锁的互斥性。

为了保证可重入性,Redisson在Lua脚本中引入了一个计数器来跟踪同一个线程获取同一个锁的次数。
解锁时,计数器递减直至为零,并执行删除操作以确保锁被正确释放。

此外,Redisson还支持分布式锁的续约功能,允许在锁超时前自动延长锁的时长,保证业务操作能够顺利完成。
该功能在高并发场景下尤为重要,可以有效防止锁丢失或超时导致的并发冲突。

RLock

RLock是Redisson实现的分布式锁的核心接口。
它继承于Java并发包中的Lock接口和Redisson自定义的RLockAsync接口。
RLock提供加锁和解锁方法,通过Netty实现异步操作的底层通信。

加锁过程通常涉及以下步骤:

依赖导入配置Redisson实例,使用RLock接口调用tryLock方法尝试获取锁

尝试获取时对于锁,Redisson会使用Redis操作中的Lua脚本来实现锁加锁和计数逻辑。
当锁超时不为-1时,会执行定时任务(watchDog)自动续订锁,保证业务执行完成之前锁不会过期。

解锁过程包括从Redis中删除锁对应的哈希表或键,并更新计数器以确保锁被正确释放。

公平锁

Redisson也提供了公平锁的实现。
它利用Redis的List和ZSet数据结构实现公平队列,按照线程请求的顺序分配锁,保证线程获取锁的公平性。
公平锁通过Lua脚本维护Redis中设置的等待队列和超时时间,并实现锁的获取、释放和计数逻辑。

总结

Redisson作为一个强大的分布式工具,通过提供丰富的分布式对象和高级服务,简化了分布式编程。
提供分布式锁、多机联锁、红锁等场景的解决方案,适合需要分布式锁功能的项目。
通过Redisson,开发者可以更加专注于业务逻辑的实现,分布式问题的解决交给Redisson。

Redisson解决Redis分布式锁提前释放问题

前言:

在分布式场景中,相信大家或多或少都需要使用分布式锁来访问重要资源或者控制耗时操作的并发。

当然,实现分布式锁的方案有很多,比如数据库、Redis、ZooKeeper等。
本文主要结合网上的一个案例来讲解一下涉及到的redis分发key的实现。

1.问题描述:

在某个天线上处理数据复制时出现问题。
经过排查,发现单次处理时间较长,过早释放了redis分配锁,导致并发处理。
同样的要求。

实际上,这是一个关键的更新问题。
对于分布式锁,我们需要考虑设置锁多久会过期,如果出现异常如何解锁?

以上问题就是本文要讨论的主题。

2、分析原因:

项目?使用比较简单的自定义redis分发锁,并定义默认的过期时间10秒来避免死锁,像这样:

overridefunlock(){while(true){//尝试获取锁if(tryLock()){返回}尝试{Thread.sleep(10)}catch(e:InterruptedException){e.printStackTrace()}}}overridefuntryLock():Boolean{valvalue=getUniqueSign()//随机字符串vaflag=redisTemplate!!.opsForValue().setIfAbsent(name,value,10000,TimeUnit.MILLISECONDS)if(flag!=null&&flag){VALUE_lOCAL.set(value)INTO_NUM_LOCAL.set(if(INTO_NUM_LOCAL.get()!=null)INTO_NUM_LOCAL.get()+1else1)returntrue}returnfalse

缺少自动密钥更新的实现。

3.解决方案:1.思路:?

对于这种情况,可以考虑如何在作业未完成时自动更新key,当然也可以自定义实现,比如开启一个后台线程,定期刷新。
这些线程的锁。

Redisson也基于这个想法来实现自动更新的分布式密钥。
各种异常情况也考虑的比较充分,Redisson分布式锁方案的优化也考虑的比较全面。

2.简单的Redisson配置:@Configuration@EnableConfigurationProperties(RedissonProperties::class)classRedissonConfig{@BeanfunredissonClient(redissonProperties:RedissonProperties):RedissonClient{valconfig=Config()valsingleServerConfig=redissonProperties.singleServerConfig!!config.useSingleServer().setAddress(singleServerConfig.address).setDatabase(singleServerConfig.database).setUsername(singleServerConfig.username).setPassword(singleServerConfig.password).setConnectionPoolSize(singleServerConfig.connectionPoolSize).setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize).setConnectTimeout(singleServerConfig.connectTimeout).setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout).setRetryInterval(singleServerConfig.retryInterval).setRetryAttempts(singleServerConfig).retryAttempts).setTimeout(singleServerConfig.timeout)returnRedisson.create(config)}}@ConfigurationProperties(prefix="xxx.redisson")classRedissonProperties{varsingleServerConfig:SingleServerConfig?=null

Redis服务使用腾讯云的哨兵模式架构。
此架构向外部访问开放代理地址,因此您可以配置独立模式配置。
设置在这里。

如果自己搭建redis模式架构,需要根据文档配置相关必要参数

3、使用示例:...@AutowiredlateinitvarredissonClient:RedissonClient...funxxx(){...vallock=redissonClient.getLock("mylock")lock.lock()try{...}finally{lock.unlock()}...

使用方法JDK提供的密钥非常相似吗?是不是很简单呢?

像Redisson这样优秀的开源产品的出现,让我们可以投入更多的时间在业务开发上……

4、源码分析

我们看一下Redisson对常见分布式锁的实现主要分析RedissonLock锁操作

1.erridepublicvoidlock(){try{lock(-1,null,false);}catch(InterruptedExceptione){thrownewIllegalStateException();}}//租用时间,即过期时间,-1表示如果不设置则系统默认为30sprivatevoidlock(longleaseTime,TimeUnitunit,booleaninterruptically)throwsInterruptedException{//尝试获取锁,如果获取如果是,直接付款longthreadId=Thread.currentThread().getId();Longttl=tryAcquire(-1,leaseTime,unit,threadId);//lockacquiredif(ttl==null){return;}RFuturefuture=subscribe(threadId);if(可中断){commandExecutor.syncSubscriptionInterrupted(future);}else{commandExecutor.syncSubscription(future);}//如果无法获取锁就尝试循环,直到获取锁或者异常成功通常终止try{while(true){ttl=tryAcquire(-1,leaseTime,unit,threadId);//lockacquiredif(ttl==null){break;}...}}最终{取消订阅(未来,threadId);}1.1、tryAcquireprivateLongtryAcquire(longwaitTime,longleaseTime,时间Unitunit,longthreadId){returnget(tryAcquireAsync(waitTime,leaseTime,unit,threadId));}privateRFuturetryAcquireAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId){RFuturettlRemainingFuture;//调用trueActivity动态获取密钥是if(leaseTime!=-1){ttlRemainingFuture=tryLockInnerAsync(waitTime,leaseTime,unit,threadId,RedisCommands.EVAL_LONG);}else{ttlRemainingFuture=tryLockInnerAsync(waitTime,internalLockLeaseTime,TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);}ttlRemainingFuture.onComplete((ttlRemaining,e)->{if(e!=null){return;}//lockacquired//此处成功获取key,尝试更新锁if(ttlRemaining==null){if(leaseTime!=-1){internalLockLeaseTime=unit.toMillis(leaseTime);}else{scheduleExpirationRenewal(threadId);}}});returnttlRemainingFuture;}//实际执行操作操作通过脚本键luaRFuturetryLockInnerAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommandcommand){//如果锁不存在那就可以了,直接设置,设置过期时间//如果锁存在现在,有两种情况需要考虑//-同一个线程获取回发键,直接对应字段(即值getLockName(threadId))+1//-不同的线程竞争锁。
这次key失败,直接返回这个key对应的过期时间returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,command,"if(redis.call('exists',KEYS[1])==0)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"returnredis.call('pttl',KEYS[1]);",董llections.singletonList(getRawName()),unit.toMillis(leaseTime),getLockName(threadId));1.2.eduleExpirationRenewal续订密钥

protectedvoidscheduleExpirationRenewal(longthreadId){ExpirationEntryentry=newExpirationEntry();ExpirationEntryoldEntry=EXPIATION_RENEWAL_MAP.putIfAbsent(getEntryName(),entry);if(oldEntry!=null){oldEntry.addThreadId(threadId);}else{entry.addThreadId(threadId);//RenewOperationReneExpiration();}}privatevoidrenewExpiration(){ExpirationEntryee=EXPIATION_RENEWAL_MAP.get(getEntryName());if(ee==null){return;}//Set设置延迟任务任务,执行该任务在InternalLockLeaseTime/3周期之后并定期更新锁Timeouttask=commandExecutor.getConnectionManager().newTimeout(newTimerTask(){@Overridepublicvoidrun(Timeouttimeout)throwsException{ExpirationEntryent=EXPIATION_RENEWAL_MAP.get(getEntryName());if(ent==null){return;}LongthreadId=ent.getFirstThreadId();if(threadId==null){return;}//实际执行extend命令操作RFuturefuture=renewExpirationAsync(threadId);future.onComplete((res,e)->{if(e!=null){log.error("Can'tupdatelock"+getRawName()+"expiration",e);EXPIATION_RENEWAL_MAP.remove(getEntryName());return;}//本次续订后,继续调度自己达到获取持续续订效果如果(res){//rescheduleitselfrenewExpiration();}});}},internalLockLeaseTime/3,TimeUnit.MILLISECONDS);ee.setTimeout(task);}//所谓续订,就是延长过期时间protectedRFuturerenewExpirationAsync(longthreadId){//如果当前key和线程存在,则延长过期时间并成功则返回1;否则返回0如果失败returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"return1;"+"end;"+"return0;",Collections.singletonList(getRawName()),internalLockLeaseTime,getLockName(threadId));2.解锁操作publicvoidunlock(){try{get(unlockAsync(Thread.currentThread().getId()));}catch(RedisExceptione){..}}publicRFutureunlockAsync(longthreadId){RPromiseresult=newRedissonPromise<>();//执行解锁操作RFuturefuture=unlockInnerAsync(threadId);//当解锁时稍后做什么操作成功Future.onComplete((opStatus,e)->{//取消续订任务cancelExpirationRenewal(threadId);...});returnresult;}protectedRFutureunlockInnerAsync(longthreadId){//如果当前线程对应的锁和记录不再存在,则直接返回void//不在字段中(即getLockName(threadId))对应的值减1//-如果减1后该值仍然大于0,则再次延长定时器expiretime//-如果减后value小于等于0,则直接删除key,发布订阅消息returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if(redis.call('hexists'),KEYS[1],ARGV[3])==0)then"+"returnnil;"+"end;"+"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+"if(counter>0)then"+"redis.call('pexpire',KEYS[1],ARGV[2]);"+"return0;"+"else"+"redis.call('del',KEYS[1]);"+"redis.call('发布',KEYS[2],ARGV[1]);"+"return1;"+"end;"+"returnnil;",Arrays.asList(getRawName(),getChannelName()),LockPubSub.UNLOCK_MESSAGE,internalLockLeaseTime,getLockName(threadId));

上面是具体实现使用redisson客户端工具添加/解锁redis分发密钥,主要解决以下问题

1.死锁问题:设置过期时间

2.重入问题:重入+1,锁释放-1,当value=0表示完全释放锁

??3.续费问题:提早解锁的问题能否解决

4.释放:谁持有锁,就释放谁

总结:

本文由网上提问介绍,通过redis分布式锁的常见实现,最终选择了解决方案redisson及其解决方案;redisson分析了具体实现细节

相关参考:

Redisson官方文档-分布式锁和同步器

Redisson官方文档-配置方法

CSDN-如何使用Redis部署分布式锁?

原文:https://juejin.cn/post/7101679218408292382
热门文章
1
SQL多表连接查询全解析:JOIN语句应... sql多表关联查询在执行SQL多表连接查询时,可以使用JOIN语句将多个表连接在...

2
Java中字符串类型详解:String与... 变量有字符类型,为什么没有字符串类型??基本类型:charshort、int、l...

3
JavaSE与JavaEE:从基础到企业... javase&#160;和javaee的区别?JavaSE和JavaEE...

4
Java程序员面试必知:核心技术问答与技... java编程程序员技术面试常见面试?随着互联网的不断发展,Java开发已经成为很...

5
Java.exe与Javaw.exe:区... 程序中java和javaw有什么区别java和javaw的区别:两者都是Java...

6
深入解析:Java中的javax包及其与... JAVA导入时,什么是javax?awt是java1.0,swing是java2...

7
Java字符串处理与键盘输入、文件读取技... 编写一个Java应用程序,从键盘读取用户输入两个字符串,并重载3个strAdd函...

8
大专生转行自学Java,迷茫时如何找到方... 我是大专生因没有好好学所以现在后悔了我想从事软件编程我正在自学java不知道怎么...

9
Java中Scanner类导入位置及使用... 在java中这句语言“importjava.util.Scanner;”是什么意...

10
Java数组倒序输出:排序后逆序存储方法... Java数组倒序输出?1.反转数组的方法有很多种,比如先排序,然后倒序存储pub...