一个基于 Redis 的可重入分布式锁的实现

摘要

设计目标与关键约束

设计目标

  • 互斥性:同一时间只有一个持有者

  • 可重入:同一线程 / 请求可多次加锁

  • 安全释放:只能释放自己持有的锁

  • 自动过期:防止死锁

  • 续期能力(Watch Dog):业务时间不确定时依然安全

  • 高性能:单 Redis Key,Lua 保证原子性

技术选型

  • Spring Boot

  • Spring Data Redis(Lettuce)

  • Redis Lua Script

Redis 是唯一依赖组件:Redis
Spring Boot 作为运行框架:Spring Boot

锁的核心数据结构设计(关键)

  • Redis Key 结构(String)

1
2
# lock:前缀
lock:order:123
  • Value 结构(Hash)

1
2
3
{
"uuid:threadId" : 重入次数
}

同一线程重入 → count +1,不同线程 → 拒绝

  • 使用StringRedisTemplate,因其序列化器是StringRedisSerializer,可以保证 Lua 脚本能够正常执行。

基于 Redis + Lua + HINCRBY 的可重入分布式锁,HashValue 序列化器 必须是 StringRedisSerializer

Lua 脚本(原子性保障)

  • 加锁脚本(支持可重入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- KEYS[1] 锁key
-- ARGV[1] ownerId (uuid:threadId)
-- ARGV[2] expireMillis

if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end

if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end

return 0
  • 解锁脚本(防误删)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- KEYS[1] 锁key
-- ARGV[1] ownerId

if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return 0
end

local count = redis.call('hincrby', KEYS[1], ARGV[1], -1)
if (count > 0) then
return 1
else
redis.call('hdel', KEYS[1], ARGV[1])
if (redis.call('hlen', KEYS[1]) == 0) then
redis.call('del', KEYS[1])
end
return 1
end

Java 实现(核心代码)

  • WatchDog,实现锁自动续期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.example.lock;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.*;

public class RedisLockWatchDog {

/**
* WatchDog的Lua脚本
* KEYS[1] 锁key
* ARGV[1] ownerId (uuid:threadId)
* ARGV[2] expireMillis 过期时间
* <p>
* 说明:
* 拥有者是当前线程就续期
*/
private static final String WATCHDOG_SCRIPT = """
if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
return 0
""";

private final StringRedisTemplate redisTemplate;

/**
* 单线程足够(Redisson 也是)
*/
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("redis-lock-watch-dog");
t.setDaemon(true);
return t;
});

/**
* 每个 lockKey 对应一个续期任务
*/
private final Map<String, ScheduledFuture<?>> renewTasks = new ConcurrentHashMap<>();

public RedisLockWatchDog(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}

/**
* 启动续期
*
* @param lockKey Redis 锁 key
* @param ownerId uuid:threadId
* @param leaseMillis 锁过期时间
*/
public void startRenew(String lockKey, String ownerId, long leaseMillis) {

// 防止重复启动
if (renewTasks.containsKey(lockKey)) {
return;
}

// 间隔多久续期一次
long period = leaseMillis / 3;

ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
renew(lockKey, ownerId, leaseMillis);
} catch (Exception e) {
// 生产环境建议接日志
}
}, period, period, TimeUnit.MILLISECONDS);

renewTasks.put(lockKey, future);
}

/**
* 取消续期
*/
public void stopRenew(String lockKey) {
ScheduledFuture<?> future = renewTasks.remove(lockKey);
if (future != null) {
future.cancel(false);
}
}

/**
* 实际续期逻辑
*/
private void renew(String lockKey, String ownerId, long leaseMillis) {

Boolean result = redisTemplate.execute(
new DefaultRedisScript<>(WATCHDOG_SCRIPT, Boolean.class),
Collections.singletonList(lockKey),
ownerId,
String.valueOf(leaseMillis)
);

if (!result) {
// 锁已不属于当前线程,停止 Watch Dog
stopRenew(lockKey);
}
}

/**
* 应用关闭时释放资源(可选)
*/
public void shutdown() {
scheduler.shutdown();
}
}
  • 锁接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.example.lock;

import java.util.concurrent.TimeUnit;

public interface DistributedLock {
/**
* 尝试获取锁
*
* @param key 锁的key
* @param waitTime 尝试获取锁的最大等待时间
* @param leaseTime 锁的过期时间
* @param unit 时间单位
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit unit);

/**
* 尝试获取锁
* 不等待立即返回
* @param key 锁的key
* @param leaseTime 锁的过期时间
* @param unit 时间单位
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean tryLock(String key, long leaseTime, TimeUnit unit);

/**
* 获取锁
* 只要获取到锁就返回,否则一直自旋获取锁
* @param key 锁的key
* @param leaseTime 锁的过期时间
* @param unit 时间单位
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean lock(String key, long leaseTime, TimeUnit unit);

/**
* 释放锁
*
* @param key 锁的key
*/
void unlock(String key);
}
  • 锁实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.example.lock;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;


@Component
public class RedisReentrantLock implements DistributedLock {

/**
* 锁的key前缀
*/
private static final String LOCK_PREFIX = "lock:";

/**
* 锁的Lua脚本
* KEYS[1] 锁key
* ARGV[1] ownerId (uuid:threadId)
* ARGV[2] expireMillis 过期时间
* <p>
* 说明:
* 1.key 不存在时创建锁
* 2.key 存在时判断锁的拥有者是否为当前线程
*/
private static final String LOCK_SCRIPT = """
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
return 0
""";

/**
* 释放锁的Lua脚本
* KEYS[1] 锁key
* ARGV[1] ownerId (uuid:threadId)
*/
private static final String UNLOCK_SCRIPT = """
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return 0
end
local count = redis.call('hincrby', KEYS[1], ARGV[1], -1)
if (count > 0) then
return 1
else
redis.call('hdel', KEYS[1], ARGV[1])
if (redis.call('hlen', KEYS[1]) == 0) then
redis.call('del', KEYS[1])
end
return 1
end
""";

private final StringRedisTemplate redisTemplate;
private final RedisLockWatchDog watchDog;
private final String uuid = UUID.randomUUID().toString();

public RedisReentrantLock(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
watchDog = new RedisLockWatchDog(redisTemplate);
}

/**
* 获取当前线程的 ownerId
*
* @return
*/
private String ownerId() {
return uuid + ":" + Thread.currentThread().getId();
}


@Override
public boolean tryLock(String key, long leaseTime, TimeUnit unit) {
String lockKey = LOCK_PREFIX + key;
long expireMillis = unit.toMillis(leaseTime);
Boolean success = redisTemplate.execute(
new DefaultRedisScript<>(LOCK_SCRIPT, Boolean.class),
Collections.singletonList(lockKey),
ownerId(),
String.valueOf(expireMillis)
);
if (Boolean.TRUE.equals(success)) {//防止NullPointerException
// 启动 Watch Dog(只有在 leaseTime 不确定时)
watchDog.startRenew(
lockKey,
ownerId(),
expireMillis
);
return true;
}
return false;
}

@Override
public boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit unit) {
long deadline = System.currentTimeMillis() + unit.toMillis(waitTime);
// 使用带条件的循环,避免重复赋值
while (System.currentTimeMillis() < deadline) {
if (tryLock(key, leaseTime, unit)) {
return true;
} else {
// 让当前线程挂起(阻塞),最长不超过指定的纳秒数
// 在竞争失败后,让出 CPU 一小段时间,避免忙等,同时控制对 Redis 的重试频率。
// 这里50毫秒是经验值,可以根据实际需求调整
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
}
}
return false;
}

@Override
public boolean lock(String key, long leaseTime, TimeUnit unit) {

// 使用无限循环,语义更清晰
while (true) {
if (tryLock(key, leaseTime, unit)) {
return true;
} else {
// 让当前线程挂起(阻塞),最长不超过指定的纳秒数
// 在竞争失败后,让出 CPU 一小段时间,避免忙等,同时控制对 Redis 的重试频率。
// 这里50毫秒是经验值,可以根据实际需求调整
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
}
}
}

@Override
public void unlock(String key) {
String lockKey = LOCK_PREFIX + key;
// 先停续期
watchDog.stopRenew(lockKey);
// 释放锁
redisTemplate.execute(
new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class),
Collections.singletonList(lockKey),
ownerId()
);
}
}
  • 测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.example;

import com.example.lock.RedisReentrantLock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@SpringBootTest
public class LockTests {
@Autowired
RedisReentrantLock redisReentrantLock;

String lockKey = "order:123";

// 测试获取锁
@Test
void demo() {
if (redisReentrantLock.tryLock(lockKey, 5, 30, TimeUnit.SECONDS)) {
try {
// 业务逻辑
doBusiness();
} finally {
redisReentrantLock.unlock(lockKey);
}
}
}
// 测试可重入锁
private void doBusiness() {
try {
if (redisReentrantLock.lock(lockKey, 30, TimeUnit.SECONDS)) {
try {
System.out.println("开始执行业务逻辑");
// 模拟业务逻辑执行时间,这里设置200秒,就是为了测试锁的自动续期功能
TimeUnit.SECONDS.sleep(200);
System.out.println("结束执行业务逻辑");
} finally {
redisReentrantLock.unlock(lockKey);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

// 测试多线程同时获取锁
@Test
void demoMultiThread() throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCount = new AtomicInteger(0);

for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 每个线程都尝试获取同一个锁
if (redisReentrantLock.tryLock(lockKey, 10, 30, TimeUnit.SECONDS)) {
try {
successCount.incrementAndGet();
System.out.println("线程 " + threadId + " 获取锁成功,开始执行业务");

// 模拟业务执行时间
TimeUnit.SECONDS.sleep(3);

System.out.println("线程 " + threadId + " 业务执行完成");
} finally {
redisReentrantLock.unlock(lockKey);
System.out.println("线程 " + threadId + " 释放锁");
}
} else {
System.out.println("线程 " + threadId + " 获取锁失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}

// 等待所有线程执行完成
latch.await();
System.out.println("成功获取锁的线程数: " + successCount.get());
}
}

SpringBoot 与 Redisson 集成之锁的使用方法

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.52.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Autowired
private RedissonClient redissonClient;

……………………………………………………………………………………………………………………………………………………………………………………………………………………
```java
RLock lock = redissonClient.getLock("myLock");

// 传统锁定方式,阻塞等待获取锁
// lock.lock();

// 或者,获取锁并在10秒后自动解锁
// lock.lock(10, TimeUnit.SECONDS);

// 或等,尝试在 100 秒内获取锁,获得后在 10 秒后自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
System.out.println("获取锁成功");
System.out.println("开始执行业务逻辑");
TimeUnit.SECONDS.sleep(5);
} finally {
lock.unlock();
}
}