基于 setnx 的分布式锁的问题
基于 setnx
的分布式锁在高并发场景下可能会出现问题。setnx
命令用于设置一个键的值,如果键不存在,则设置成功并返回 1;如果键已经存在,则设置失败并返回 0。这种机制在分布式环境中可能导致以下问题:
- 锁的重入性:如果一个线程在持有锁的情况下再次请求锁,可能会导致死锁。
- 锁的重试机制:如果一个线程在持有锁的情况下发生异常,可能会导致锁无法释放,从而导致其他线程无法获取锁。
- 锁的超时释放:如果一个线程在持有锁的情况下长时间未释放锁,可能会导致其他线程无法获取锁。
- 主从一致性:在主从复制的环境中,
setnx
命令可能会导致数据不一致的问题。
解决方案
Redisson 是一个基于 Redis 的 Java 驻内存数据网格框架,提供了分布式锁、分布式集合、分布式队列等数据结构的实现。Redisson 通过 Redis 的原子操作来实现分布式锁,避免了 setnx
的问题。
示例
1 2 3 4 5 6
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RedissonConfig {
@Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456"); return Redisson.create(config); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| ...
@Resource private RedissonClient redissonClient;
@Override public Result seckillVoucher(Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀未开始"); }
if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已结束"); }
if (voucher.getStock() < 1) { return Result.fail("库存不足"); }
Long userId = UserHolder.getUser().getId();
RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock) { return Result.fail("不允许重复下单"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } } ...
|
可重入锁

使用 Hash 来存储锁的线程 ID 和重入次数。使用 HSET
命令来设置锁的值,使用 HINCRBY
命令来增加重入次数,使用 EXPIRE
命令来设置锁的过期时间。
KEY |
VALUE |
lock:order |
field: thread1 value: 1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| local key = KEYS[1]; local threadId = ARGV[1]; local releaseTime = ARGV[2];
if (redis.call('exists', key) == 0) then redis.call('hset', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end
if (redis.call('hexists', key, threadId) == 1) then redis.call('hincrby', key, threadId, '1'); redis.call('expire', key, releaseTime); end
return 0
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| local key = KEYS[1]; local threadId = ARGV[1]; local releaseTime = ARGV[2];
if (redis.call('HEXISTS', key, threadId) == 0) then return nil; end;
local count = redis.call('HINCRBY', key, threadId, -1);
if (count > 0) then redis.call('EXPIRE', key, releaseTime); return nil; else redis.call('DEL', key); return nil; end;
|
使用 Redisson 的可重入锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class BackApplicationTests { @Autowired private RedissonClient redissonClient;
private RLock lock;
@BeforeEach void setUp() { lock = redissonClient.getLock("lock"); }
@Test void method1() { boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败...... 1"); return; } try { log.info("获取锁成功...... 1"); method2(); log.info("执行业务逻辑...... 1"); } finally { log.info("准备释放锁...... 1"); lock.unlock(); } }
@Test void method2() { boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败...... 2"); return; } try { log.info("获取锁成功...... 2"); log.info("执行业务逻辑...... 2"); } finally { log.info("准备释放锁...... 2"); lock.unlock(); } } }
|
在 method1 和 method2 中都获取了同一个锁,Redisson 会自动处理重入锁的逻辑。第一次获取锁时,重入次数为 1,第二次获取锁时,重入次数加 1。释放锁时,Redisson 会判断重入次数是否为 0,如果不为 0,则不会删除锁。
分别在 boolean isLock = lock.tryLock();
打断点调试,发现:
第一次获取锁时,重入次数为 1

第二次获取锁时,重入次数加 1

当重入次数为 0 时,释放锁
…