写作不易,点赞收藏关注一键三连,以便下次再看,感谢支持~
前两篇文章咱们聊到了如何采用SQL数据库及Zookeeper实现相应的分布式锁。
初识分布式锁(一)
初识分布式锁(二):ZooKeeper分布式锁原理浅析及实战案例
今天咱们再来聊聊如何采用redis实现相应的分布式锁,以及这种实现与前两种方式实现的差异性。
Redis常见命令
在介绍分布式锁之前,我们先来了解一下redis的常用命令:
1、SET key value [EX seconds] [PX milliseconds] [NX|XX],将字符串值 value
关联到 key
。如果 key
已经持有其他值, SET就覆写旧值,无视类型。从 Redis 2.6.12 版本开始, SET命令的行为可以通过一系列参数来修改:
-
EX second
:设置键的过期时间为second
秒。SET key value EX second
效果等同于SETEX key second value
。 -
PX millisecond
:设置键的过期时间为millisecond
毫秒。SET key value PX millisecond
效果等同于PSETEX key millisecond value
。 -
NX
:只在键不存在时,才对键进行设置操作。SET key value NX
效果等同于SETNX key value
。 -
XX
:只在键已经存在时,才对键进行设置操作。
2、EXPIRE key seconds,为给定 key
设置生存时间,当 key
过期时(生存时间为 0
),它会被自动删除。
3、SETEX key seconds value,将值 value
关联到 key
,并将 key
的生存时间设为 seconds
(以秒为单位)。
这个命令类似于以下两个命令:
SET key value
EXPIRE key seconds # 设置生存时间
SETEX命令与SET + EXPIRE命令的区别主要在于,SETEX命令可以保持原子性,而SET+EXPIRE属于两条命令,难以保持其原子性。
4、DEL key [key ...],删除给定的一个或多个 key
。
5、SETNX key value,将 key
的值设为 value
,当且仅当 key
不存在。若给定的 key
已经存在,则 SETNX 不做任何动作。
分布式锁最关键的主要几个命令我都罗列在上面了~如果还有不清楚或者没有提及的命令,可以点开这个文章进行查找。

Lua脚本
紧接着还需要介绍一个redis里面比较不常见的内容,lua脚本。
一般我们需要操作redis的时候,都是需要进入到redis客户端,通过一个一个的命令进行编辑输入,从而完成相应的redis操作。
这样的方式操作起来相对方便,而且都是及时反馈,在命令数量较少、操作简单的时候十分友好。
但是如果当需要执行的命令很多、而且命令可能有前后依赖的时候,那么采用这样一个个命令输入的方式就显得十分不友好了。
为此,redis特意引入了lua脚本,用户可以向服务器发送 lua 脚本来执行自定义动作,获取脚本的响应数据。
而且另外一个特点是,Redis 服务器会单线程原子性执行 lua 脚本,保证 lua 脚本在处理的过程中不会被任意其它请求打断。这个也是lua脚本相比较于单条命令不断执行的优势之一。
分布式锁原理浅析
redis实现分布式锁,主要有两种方式:1、基于redis命令实现;2、基于lua脚本实现。
基于redis命令实现
实现的逻辑主要梳理如下:
- 当线程进入程序时候,采用SETNX命令往缓存中设置key值,如果设置成功,证明此时加锁成功。
- 当线程退出程序的时候,采用DEL命令将key值删除,从而实现解锁。
SETNX key value # 加锁
# 实现相应的业务代码逻辑
DEL key value # 解锁
但是这样明显存在一个问题,如果一个线程在加锁期间,因为某些特殊原因挂掉了,没有进行解锁,此时就会产生【死锁】,从而严重影响整个系统的性能。
因此在加锁后我们还需要采用EXPIRE命令,为相应的KEY值添加上过期时间从而避免死锁的产生。
SETNX key value # 加锁
EXPIRE key seconds # 设置过期时间
# 实现相应的业务代码逻辑
DEL key value # 解锁
问题是不是到此就解决了呢?显然并没有!
之前我们说过由于加锁及设置过期时间的代码是两个命令,而redis在执行两个命令的时候并不能保证原子性,因此又可能出现在执行SETNX命令的时候,出现宕机,这样还是出现了死锁!
因此,在redis,对set命令进行了拓展,我们可以将上述的代码替换成下述的代码。
SET key value EX seconds NX # 设置锁的超时时间,且当key存在时直接返回。
# 实现相应的业务代码逻辑
DEL key value # 解锁
尽管如此,锁重入仍是个难题,因为我们采用了NX参数,因此难以实现锁的重入;

基于lua脚本实现
相反,得益于lua脚本的执行时的原子性,lua脚本能较好的解决上述的种种问题。
用lua脚本实现的加锁代码大致流程如下所示:

lua脚本实现解锁的主要流程如下所示:

更详细的代码解析,在Redisson源码浅析中我们会分析到。
但需要注意的一点是,锁的过期时间设定是一门难题,设置时间长了,锁久久不释放影响性能;设置短了,业务代码还没执行完锁就释放了,没法限制其他线程的代码执行。比较巧妙的是,现有的框架里面已经有使用守护线程的方式(看门狗)来自动延长过期时间,从而简化使用的门槛。
代码实战
这次代码实战,我们采用Redission实现分布式锁,其实redission框架对分布式锁的封装相对完善,只需要很少的代码就可以实现对应分布式加锁及解锁。
首先,我们写一个配置类,用于加载我们对应的容器到spring中,这里需要注意的一点是,@Bean注解会默认使用方法名作为容器名字,要确保咱们的方法名与要加载的容器名字一致,当然也可以使用@Bean(value = "redissionClient")来显式的指定容器的名字。
@Configuration
public class RedisConfig {
//这里在application.yml中填写你对应的redis的ip:port
@Value("${redis.address}")
private String redisAddress;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress(redisAddress);
return Redisson.create(config);
}
}
在将对应的容器注入到Spring的框架后,我们调用redission的关键方法getLock获取对应的锁。紧接着可以对这个锁调用相应的tryLock方法进行上锁,这里的上锁是个多态方法,主要区别如下所示:
// 不填写参数,即时获取锁,如果锁不可用则直接返回false。
boolean tryLock();
// 在给定时间内获取对应的锁(如果线程没有被中断)
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
这里我们采用简单的方法实现,直接采用tryLock()改造对应的代码内容,改造后的代码如下:
@Resource
RedissionClient redissionClient;
public Boolean deductProduct(ProductPO productPO){
//首先获取分布式的锁
RLock lock = redissonClient.getLock("deductProduct");
try{
LOGGER.info("分布式锁加锁!");
//尝试对redis的分布式锁进行加锁
boolean success = lock.tryLock(30, TimeUnit.SECONDS);
if (!success){
//加锁失败,直接返回
return false;
}
LOGGER.info("查找商品的数据为 :"+ JSON.toJSONString(productPO));
Example example = new Example(ProductPO.class);
Example.Criteria criteria = example.createCriteria();
criteria.andEqualTo("skuId", productPO.getSkuId());
List<ProductPO> productPOS = productMapper.selectByExample(example);
if (CollectionUtils.isEmpty(productPOS)){
throw new RuntimeException("当前商品不存在");
}
for (ProductPO selectProductPO: productPOS){
//对对应的sku进行数量扣减
Integer number = selectProductPO.getNumber();
LOGGER.info("当前商品的数量为:"+number);
if (number<=0){
//小于等于0时,不进行扣减
continue;
}
selectProductPO.setNumber(number-productPO.getNumber());
productMapper.updateByPrimaryKey(selectProductPO);
}
}finally {
//最后一定记得释放锁资源
LOGGER.info("分布式锁释放!");
lock.unlock();
}
return true;
}
随后运行咱们的代码就可以得到相应的结果啦:


源码浅析
加锁源码
对tryLock(),即加锁的代码进行分析。
boolean success = lock.tryLock();
深入到关键的源码层面,其主要代码如下:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Boolean> acquiredFuture;
if (leaseTime != -1) {
/*关键代码*/
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
/*关键代码*/
acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
//如果成功获取锁
if (acquired) {
if (leaseTime != -1) {
// 明确指定了租约时间,则更新类相应的租约时间即可
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 否则将当前的ThreadId保存到一个相应的ConcurrentMap中,
// 开启守护线程,定期刷新对应线程ID持有锁的过期时间。避免出现锁过期被释放的问题
scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
return new CompletableFutureWrapper<>(f);
}