一个基于 Redis 的可重入分布式锁的实现

摘要

设计目标与关键约束

设计目标

  • 互斥性:同一时间只有一个持有者

  • 可重入:同一线程 / 请求可多次加锁

  • 安全释放:只能释放自己持有的锁

  • 自动过期:防止死锁

  • 续期能力(Watch Dog):业务时间不确定时依然安全

  • 高性能:单 Redis Key,Lua 保证原子性

技术选型

  • Spring Boot

  • Spring Data Redis(Lettuce)

  • Redis Lua Script

Redis 是唯一依赖组件:Redis
Spring Boot 作为运行框架:Spring Boot

锁的核心数据结构设计(关键)

  • Redis Key 结构(String)

1
2
# lock:前缀
lock:order:123
  • Value 结构(Hash)

1
2
3
{
"uuid:threadId" : 重入次数
}

同一线程重入 → count +1,不同线程 → 拒绝

  • 使用StringRedisTemplate,因其序列化器是StringRedisSerializer,可以保证 Lua 脚本能够正常执行。

基于 Redis + Lua + HINCRBY 的可重入分布式锁,HashValue 序列化器 必须是 StringRedisSerializer

Lua 脚本(原子性保障)

  • 加锁脚本(支持可重入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- KEYS[1] 锁key
-- ARGV[1] ownerId (uuid:threadId)
-- ARGV[2] expireMillis

if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end

if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end

return 0
  • 解锁脚本(防误删)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- KEYS[1] 锁key
-- ARGV[1] ownerId

if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return 0
end

local count = redis.call('hincrby', KEYS[1], ARGV[1], -1)
if (count > 0) then
return 1
else
redis.call('hdel', KEYS[1], ARGV[1])
if (redis.call('hlen', KEYS[1]) == 0) then
redis.call('del', KEYS[1])
end
return 1
end

Java 实现(核心代码)

  • WatchDog,实现锁自动续期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.example.lock;

import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;

public class RedisLockWatchDog {

private final StringRedisTemplate redisTemplate;

/**
* 单线程足够(Redisson 也是)
*/
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("redis-lock-watch-dog");
t.setDaemon(true);
return t;
});

/**
* 每个 lockKey 对应一个续期任务
*/
private final Map<String, ScheduledFuture<?>> renewTasks = new ConcurrentHashMap<>();

public RedisLockWatchDog(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}

/**
* 启动续期
*
* @param lockKey Redis 锁 key
* @param ownerId uuid:threadId
* @param leaseMillis 锁过期时间
*/
public void startRenew(String lockKey, String ownerId, long leaseMillis) {

// 防止重复启动
if (renewTasks.containsKey(lockKey)) {
return;
}

// 间隔多久续期一次
long period = leaseMillis / 3;

ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
renew(lockKey, ownerId, leaseMillis);
} catch (Exception e) {
// 生产环境建议接日志
}
}, period, period, TimeUnit.MILLISECONDS);

renewTasks.put(lockKey, future);
}

/**
* 取消续期
*/
public void stopRenew(String lockKey) {
ScheduledFuture<?> future = renewTasks.remove(lockKey);
if (future != null) {
future.cancel(false);
}
}

/**
* 实际续期逻辑
*/
private void renew(String lockKey, String ownerId, long leaseMillis) {

// 判断锁是否还属于当前线程
Object count = redisTemplate.opsForHash().get(lockKey, ownerId);
if (Objects.isNull(count)) {
// 锁已丢失,停止续期
stopRenew(lockKey);
return;
}

// 续期
redisTemplate.expire(lockKey, leaseMillis, TimeUnit.MILLISECONDS);
}

/**
* 应用关闭时释放资源(可选)
*/
public void shutdown() {
scheduler.shutdown();
}
}
  • 锁接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.lock;

import java.util.concurrent.TimeUnit;

public interface DistributedLock {
/**
* 尝试获取锁
*
* @param key 锁的key
* @param waitTime 尝试获取锁的最大等待时间
* @param leaseTime 锁的过期时间
* @param unit 时间单位
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit unit);

/**
* 获取锁
*
* @param key 锁的key
* @param leaseTime 锁的过期时间
* @param unit 时间单位
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean lock(String key, long leaseTime, TimeUnit unit);

/**
* 释放锁
*
* @param key 锁的key
*/
void unlock(String key);
}
  • 锁实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.example.lock;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

@Component
public class RedisReentrantLock implements DistributedLock {

/**
* 锁的key前缀
*/
private static final String LOCK_PREFIX = "lock:";

/**
* 锁的Lua脚本
* KEYS[1] 锁key
* ARGV[1] ownerId (uuid:threadId)
* ARGV[2] expireMillis 过期时间
*
* 说明:
* 1.key 不存在时创建锁
* 2.key 存在时判断锁的拥有者是否为当前线程
*/
private static final String LOCK_SCRIPT = """
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
return 0
""";

/**
* 释放锁的Lua脚本
* KEYS[1] 锁key
* ARGV[1] ownerId (uuid:threadId)
*/
private static final String UNLOCK_SCRIPT = """
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return 0
end
local count = redis.call('hincrby', KEYS[1], ARGV[1], -1)
if (count > 0) then
return 1
else
redis.call('hdel', KEYS[1], ARGV[1])
if (redis.call('hlen', KEYS[1]) == 0) then
redis.call('del', KEYS[1])
end
return 1
end
""";

private final StringRedisTemplate redisTemplate;
private final RedisLockWatchDog watchDog;
private final String uuid = UUID.randomUUID().toString();

public RedisReentrantLock(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
watchDog = new RedisLockWatchDog(redisTemplate);
}

private String ownerId() {
return uuid + ":" + Thread.currentThread().getId();
}


public boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit unit) {
long deadline = System.currentTimeMillis() + unit.toMillis(waitTime);
String lockKey = LOCK_PREFIX + key;

while (System.currentTimeMillis() < deadline) {
Boolean success = redisTemplate.execute(
new DefaultRedisScript<>(LOCK_SCRIPT, Boolean.class),
Collections.singletonList(lockKey),
ownerId(),
String.valueOf(unit.toMillis(leaseTime))
);
if (success) {
// 启动 Watch Dog(只有在 leaseTime 不确定时)
watchDog.startRenew(
lockKey,
ownerId(),
unit.toMillis(leaseTime)
);
return true;
} else {
// 让当前线程挂起(阻塞),最长不超过指定的纳秒数
// 在竞争失败后,让出 CPU 一小段时间,避免忙等,同时控制对 Redis 的重试频率。
// 这里50毫秒是经验值,可以根据实际需求调整
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
}
}
return false;
}

@Override
public boolean lock(String key, long leaseTime, TimeUnit unit) {
String lockKey = LOCK_PREFIX + key;
boolean success = redisTemplate.execute(
new DefaultRedisScript<>(LOCK_SCRIPT, Boolean.class),
Collections.singletonList(lockKey),
ownerId(),
String.valueOf(unit.toMillis(leaseTime))
);
if (success) {
// 启动 Watch Dog
watchDog.startRenew(
lockKey,
ownerId(),
unit.toMillis(leaseTime)
);
return true;
} else {
return false;
}
}

@Override
public void unlock(String key) {
String lockKey = LOCK_PREFIX + key;
// 先停续期
watchDog.stopRenew(lockKey);
// 释放锁
redisTemplate.execute(
new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class),
Collections.singletonList(lockKey),
ownerId()
);
}
}
  • 测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.lock.RedisReentrantLock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.TimeUnit;

@SpringBootTest
public class LockTests {
@Autowired
RedisReentrantLock redisReentrantLock;

String lockKey = "order:123";
@Test
void demo() {
if (redisReentrantLock.tryLock(lockKey, 5, 30, TimeUnit.SECONDS)) {
try {
// 业务逻辑
doBusiness();
} finally {
redisReentrantLock.unlock(lockKey);
}
}
}

private void doBusiness() {
try {
if (redisReentrantLock.lock(lockKey, 30, TimeUnit.SECONDS)) {
try {
System.out.println("开始执行业务逻辑");
// 模拟业务逻辑执行时间,这里设置200秒,就是为了测试锁的自动续期功能
TimeUnit.SECONDS.sleep(200);
System.out.println("结束执行业务逻辑");
} finally {
redisReentrantLock.unlock(lockKey);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}