Redisson

基于 setnx 的分布式锁的问题

基于 setnx 的分布式锁在高并发场景下可能会出现问题。setnx 命令用于设置一个键的值,如果键不存在,则设置成功并返回 1;如果键已经存在,则设置失败并返回 0。这种机制在分布式环境中可能导致以下问题:

  1. 锁的重入性:如果一个线程在持有锁的情况下再次请求锁,可能会导致死锁。
  2. 锁的重试机制:如果一个线程在持有锁的情况下发生异常,可能会导致锁无法释放,从而导致其他线程无法获取锁。
  3. 锁的超时释放:如果一个线程在持有锁的情况下长时间未释放锁,可能会导致其他线程无法获取锁。
  4. 主从一致性:在主从复制的环境中,setnx 命令可能会导致数据不一致的问题。

解决方案

Redisson 是一个基于 Redis 的 Java 驻内存数据网格框架,提供了分布式锁、分布式集合、分布式队列等数据结构的实现。Redisson 通过 Redis 的原子操作来实现分布式锁,避免了 setnx 的问题。

示例

1
2
3
4
5
6
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// config/RedissonConfig.java
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
//1.创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");
//2.根据Config创建出RedissonClient实例
return Redisson.create(config);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
...
// @Resource
// private StringRedisTemplate stringRedisTemplate;

@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查询代金券库存
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒杀未开始
return Result.fail("秒杀未开始");
}

// 3. 判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒杀已结束
return Result.fail("秒杀已结束");
}

// 4. 判断库存是否足够
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足");
}


Long userId = UserHolder.getUser().getId();

// 创建锁对象 锁定用户id (分布式锁)
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 替换为 Redisson
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 获取锁
boolean isLock = lock.tryLock();
// 判断获取锁是否成功
if (!isLock) {
// 获取锁失败, 返回错误
return Result.fail("不允许重复下单");
}
try {
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}
}
...

可重入锁

alt text

使用 Hash 来存储锁的线程 ID 和重入次数。使用 HSET 命令来设置锁的值,使用 HINCRBY 命令来增加重入次数,使用 EXPIRE 命令来设置锁的过期时间。

KEY VALUE
lock:order field: thread1 value: 1
  • 获取锁的 Lua 脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
local key = KEYS[1]; -- 锁的 key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断锁是否存在
if (redis.call('exists', key) == 0) then
-- 锁不存在, 获取锁
redis.call('hset', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end
-- 锁存在, 判断是否是当前线程
if (redis.call('hexists', key, threadId) == 1) then
-- 是当前线程, 重入锁, 重入次数 +1
redis.call('hincrby', key, threadId, '1');
-- 设置过期时间
redis.call('expire', key, releaseTime);
end
-- 获取锁的不是当前线程, 获取锁失败
return 0
  • 释放锁的 Lua 脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
local key = KEYS[1]; -- 锁的 key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果已经不是自己, 则直接返回
end;
-- 是自己的锁, 则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数已经为0
if (count > 0) then
-- 大于0则不能释放锁, 重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else -- 等于0说明可以释放锁, 直接删除
redis.call('DEL', key);
return nil;
end;

使用 Redisson 的可重入锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class BackApplicationTests {
@Autowired
private RedissonClient redissonClient;

private RLock lock;

@BeforeEach
void setUp() {
lock = redissonClient.getLock("lock");
}

@Test
void method1() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败...... 1");
return;
}
try {
log.info("获取锁成功...... 1");
method2();
log.info("执行业务逻辑...... 1");
} finally {
log.info("准备释放锁...... 1");
lock.unlock();
}
}

@Test
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败...... 2");
return;
}
try {
log.info("获取锁成功...... 2");
log.info("执行业务逻辑...... 2");
} finally {
log.info("准备释放锁...... 2");
lock.unlock();
}
}
}

在 method1 和 method2 中都获取了同一个锁,Redisson 会自动处理重入锁的逻辑。第一次获取锁时,重入次数为 1,第二次获取锁时,重入次数加 1。释放锁时,Redisson 会判断重入次数是否为 0,如果不为 0,则不会删除锁。

分别在 boolean isLock = lock.tryLock(); 打断点调试,发现:

第一次获取锁时,重入次数为 1
alt text

第二次获取锁时,重入次数加 1
alt text

当重入次数为 0 时,释放锁