分布式设计之美(一):主流分布式锁实现方案

前言

目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式系统的 CAP 理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项”。所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。

针对分布式锁的实现,目前比较常用的有以下几种方案:1、基于数据库实现分布式锁;2、基于缓存 Redis 实现分布式锁;3、基于 Zookeeper 实现分布式锁。下面我将谈谈它们各种的实现方案。

基于 Redis 实现分布式锁

基于单机版 Redis 分布式锁 SETNX

使用 SETNX(set if not exist)指令插入一个键值对,如果 Key 已经存在,那么会返回 False,否则插入成功并返回 True。SETNX 指令和数据库的唯一索引类似,保证了只存在一个 Key 的键值对,那么可以用一个 Key 的键值对是否存在来判断是否存于锁定状态。EXPIRE 指令可以为一个键值对设置一个过期时间,从而避免了数据库唯一索引实现方式中释放锁失败的问题。

命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。客户端执行以上的命令:如果服务器返回 OK,那么这个客户端获得锁;如果服务器返回 NIL,那么客户端获取锁失败,可以在稍后再重试。设置的过期时间到达之后,锁将自动释放。

可以通过以下修改,让这个锁实现更健壮:1、不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。2、不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。

1
2
3
4
5
6
7
8
9
// 锁的获取:
SET resource_name my_random_value NX PX 30000

// 锁的释放:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

实现方式一

1、组件依赖:通过 Maven 引入 Jedis 开源组件,在 pom.xml 文件加入下面的代码:

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>

2、代码实现:通过 Java 代码实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class RedisDistributedLock {

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

/**
* 尝试获取分布式锁
*
* @param jedis Redis 客户端
* @param lockKey 加锁键
* @param clientId 加锁客户端唯一标识(采用 UUID)
* @param expireTime 锁过期时间
* @return 是否获取成功
*/
public static Boolean acquireLock(Jedis jedis, String lockKey, String clientId, int expireTime) {
// String nxxx,NX|XX; String expx,EX|PX, EX = seconds, PX = milliseconds;
String result = jedis.set(lockKey, clientId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
// 如果 setNX 成功, 返回 "OK"; 如果 setNX 失败, 返回 null;
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}

/**
* 释放 Redis 全局锁
* P.S.
* 1. 锁的释放必须使用 lua 脚本, 保证操作的原子性; 使用 lua 脚本使 get 与 del 方法执行成为原子性
* 2. 为了保证锁被锁的持有者释放, 使用 lua 脚本删除 redis 中匹配 value 的 key, 可以避免由于方法执行时间过长而 redis 锁自动过期失效的时候误删其他线程的锁
*
* @param jedis Redis 客户端
* @param lockKey 加锁键
* @param clientId 加锁客户端唯一标识(采用 UUID)
* @return Boolean
*/
public Boolean releaseLock(Jedis jedis, String lockKey, String clientId) {
// 释放锁的时候, 有可能因为持锁之后方法执行时间大于锁的有效期, 此时有可能已经被另外一个线程持有锁, 所以不能直接删除
Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(clientId));
return RELEASE_SUCCESS.equals(result);
}
}

3、try-with-resources 实现:大家在写程序的时候是不是总忘记释放锁呢?就像以前对流操作时,忘记了关闭流。从 Java 7 开始,加入了 try-with-resources 的方式,它可以 自动的执行 close() 方法,释放资源,再也不用写 finally 块了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class RedisDistributedLock implements Closeable {

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

private Jedis jedis;
private String lockKey;
private String clientId;
private int expireTime;

/**
* 构造全局分布式锁
*
* @param jedis Redis 客户端
* @param lockKey 加锁键
* @param clientId 加锁客户端唯一标识(采用 UUID)
* @param expireTime 锁过期时间
*/
public RedisDistributedLock(Jedis jedis, String lockKey, String clientId, int expireTime) {
this.jedis = jedis;
this.lockKey = lockKey;
this.clientId = clientId;
this.expireTime = expireTime;
}

/**
* 尝试获取分布式锁
*
* @return 是否获取成功
*/
public Boolean acquireLock() {
// String nxxx,NX|XX; String expx,EX|PX, EX = seconds, PX = milliseconds;
String result = jedis.set(lockKey, clientId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
// 如果 setNX 成功, 返回 "OK"; 如果 setNX 失败, 返回 null;
return LOCK_SUCCESS.equals(result);
}


/**
* 释放 Redis 全局锁
* P.S.
* 1. 锁的释放必须使用 lua 脚本, 保证操作的原子性; 使用 lua 脚本使 get 与 del 方法执行成为原子性
* 2. 为了保证锁被锁的持有者释放, 使用 lua 脚本删除 redis 中匹配 value 的 key, 可以避免由于方法执行时间过长而 redis 锁自动过期失效的时候误删其他线程的锁
*/
@Override
public void close() throws IOException {
// 释放锁的时候, 有可能因为持锁之后方法执行时间大于锁的有效期, 此时有可能已经被另外一个线程持有锁, 所以不能直接删除
jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(clientId));
}
}

实现方式二

1、组件依赖:通过 Maven 引入 spring-boot-starter-data-redis 开源组件,在 pom.xml 文件加入下面的代码:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、代码实现:通过 Java 代码实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component
@Slf4j
public class RedisDistributedLock {

@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* 该加锁方法仅针对单实例 Redis 可实现分布式加锁;
* P.S.
* 1. 单机版分布式锁 SETNX, 所谓 SETNX, 是「SET if Not eXists」的缩写;
* 2. SET resource_name my_random_value NX PX 30000 因此当多个客户端去争抢执行上锁或解锁代码时, 最终只会有一个客户端执行成功. 同时 set 命令还可以指定 key 的有效期, 这样即使当前客户端奔溃, 过一段时间锁也会被 redis 自动释放, 这就给了其它客户端获取锁的机会.
*
* @param lockKey 加锁键
* @param clientId 加锁客户端唯一标识(采用 UUID)
* @param timeout 锁过期时间
* @param timeUnit 锁过期单位
* @return Boolean
*/
public Boolean acquireLock(String lockKey, String clientId, Long timeout, TimeUnit timeUnit) {
return stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, timeout, timeUnit);
}

/**
* 释放 Redis 全局锁
* P.S.
* 1. 锁的释放必须使用 lua 脚本, 保证操作的原子性; 使用 lua 脚本使 get 与 del 方法执行成为原子性
*
* @param lockKey 加锁键
* @param clientId 加锁客户端唯一标识(采用 UUID)
* @return Boolean
*/
public Boolean releaseLock(String lockKey, String clientId) {
// 释放锁的时候, 有可能因为持锁之后方法执行时间大于锁的有效期, 此时有可能已经被另外一个线程持有锁, 所以不能直接删除
return (Boolean) stringRedisTemplate.execute((RedisCallback<Object>) connection -> {
// 为了保证锁被锁的持有者释放, 使用 lua 脚本删除 redis 中匹配 value 的 key, 可以避免由于方法执行时间过长而 redis 锁自动过期失效的时候误删其他线程的锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
return connection.<Boolean>eval(script.getBytes(), ReturnType.BOOLEAN, 1, lockKey.getBytes(), clientId.getBytes());
});
}
}

注意

上述代码实现,仅对 redis 单实例架构有效,当面对 redis 集群时就无效了。但是一般情况下,我们的 redis 架构多数会做成 “主备” 模式,然后再通过 redis 哨兵实现主从切换,这种模式下我们的应用服务器直接面向主机,也可看成是单实例,因此上述代码实现也有效。但是当在主机宕机,从机被升级为主机的一瞬间的时候,如果恰好在这一刻,由于 redis 主从复制的异步性,导致从机中数据没有即时同步,那么上述代码依然会无效,导致同一资源有可能会产生两把锁,违背了分布式锁的原则。

基于 Redis 的 RedLock 算法

使用了多个 Redis 实例来实现分布式锁,这是为了保证在发生单点故障时仍然可用。Redis 的作者提出了 RedLock 的解决方案。方案非常的巧妙和简洁。RedLock 的核心思想就是,同时使用多个 Redis Master 来冗余,且这些节点都是完全的独立的,也不需要对这些节点之间的数据进行同步。

假设我们有 N 个 Redis 节点,N 应该是一个大于 2 的奇数。RedLock 的实现步骤:

  • 获取当前 Unix 时间,以毫秒为单位。
  • 使用上文提到的方法依次获取 N 个节点的 Redis 锁。
  • 如果获取到的锁的数量大于(N/2+1)个,且获取的时间小于锁的有效时间(lock validity time)就认为获取到了一个有效的锁。
  • 如果获取锁的数量小于(N/2+1),或者在锁的有效时间(lock validity time)内没有获取到足够的锁,就认为获取锁失败。这个时候需要向所有节点发送释放锁的消息。

实现方式一

Redisson 在基于 NIO 的 Netty 框架上,充分的利用了 Redis 键值数据库提供的一系列优势,在 Java 实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

基于 Redis 的 Redisson 红锁 RedissonRedLock 对象实现了 Redlock 介绍的加锁算法。该对象也可以用来将多个 RLock 对象关联为一个红锁,每个 RLock 对象实例可以来自于不同的 Redisson 实例。

1、组件依赖:通过 Maven 引入 redisson 开源组件,在 pom.xml 文件加入下面的代码:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.9.1</version>
</dependency>

2、代码实现:通过 Java 代码实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main() {
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://xxxx1:xxx1")
.setPassword("xxxx1")
.setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);

Config config2 = new Config();
config2.useSingleServer()
.setAddress("redis://xxxx2:xxx2")
.setPassword("xxxx2")
.setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().
setAddress("redis://xxxx3:xxx3")
.setPassword("xxxx3")
.setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);

String lockName = "redlock-test";
RLock lock1 = redissonClient1.getLock(lockName);
RLock lock2 = redissonClient2.getLock(lockName);
RLock lock3 = redissonClient3.getLock(lockName);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
// 为加锁等待 0.5 秒时间,并在加锁成功 30 秒钟后自动解开
isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
System.out.println("isLock =" + isLock);
if (isLock) {
// lock success, do something;
Thread.sleep(30000);
}
} catch (Exception e) {

} finally {
// 无论如何, 最后都要解锁
redLock.unlock();
System.out.println("unlock success");
}
}

注意

失败时重试:当客户端无法获取到锁时,应该随机延时后进行重试,防止多个客户端在同一时间抢夺同一资源的锁(会导致脑裂,最终都不能获取到锁)。客户端获得超过半数节点的锁花费的时间越短,那么脑裂的概率就越低。所以,理想的情况下,客户端最好能够同时(并发)向所有 redis 发出 set 命令。当客户端从多数节点获取锁失败时,应该尽快释放已经成功获取的锁,这样其他客户端不需要等待锁过期后再获取。(如果存在网络分区,客户端已经无法和 redis 进行通信,那么此时只能等待锁过期后自动释放)。

PUBSUB:订阅者模式,当释放锁的时候,其他客户端能够知道锁已经被释放的消息,并让队列中的第一个消费者获取锁。使用 PUB/SUB 消息机制的优点:减少申请锁时的等待时间、安全、 锁带有超时时间、锁的标识唯一,防止死锁 锁设计为可重入,避免死锁。

基于 Zookeeper 实现分布式锁

Zookeeper 节点性质

  • 有序节点:假如当前有一个父节点为 / lock,我们可以在这个父节点下面创建子节点;zookeeper 提供了一个可选的有序特性,例如我们可以创建子节点 “/lock/node-” 并且指明有序,那么 zookeeper 在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为 / lock/node-0000000000,下一个节点则为 / lock/node-0000000001,依次类推。
  • 临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper 会自动删除该节点。
  • 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper 会通知客户端。当前 zookeeper 有如下四种事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变更。

基于创建临时 Znode

某个节点尝试创建临时 znode,此时创建成功了就获取了这个锁;这个时候别的客户端来创建锁会失败,只能注册个监听器监听这个锁。释放锁就是删除这个 znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以再次重新加锁。

这种方案的正确性和可靠性是 ZooKeeper 机制保证的,实现简单。缺点是会产生 “惊群” 效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public class ZooKeeperSession {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private ZooKeeper zookeeper;
private CountDownLatch latch;

public ZooKeeperSession() {
try {
this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher());
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取分布式锁
*
* @param productId
*/
public Boolean acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception e) {
while (true) {
try {
// 相当于是给 node 注册一个监听器,去看看这个监听器是否存在
Stat stat = zk.exists(path, true);

if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception ee) {
continue;
}
}

}
return true;
}

/**
* 释放掉一个分布式锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for product[id=" + productId + "]......");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 建立 zk session 的 watcher
*/
private class ZooKeeperWatcher implements Watcher {

public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event.getState());

if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}

if (this.latch != null) {
this.latch.countDown();
}
}

}

/**
* 封装单例的静态内部类
*/
private static class Singleton {

private static ZooKeeperSession instance;

static {
instance = new ZooKeeperSession();
}

public static ZooKeeperSession getInstance() {
return instance;
}
}

/**
* 获取单例
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}

/**
* 初始化单例的便捷方法
*/
public static void init() {
getInstance();
}
}

基于创建临时顺序节点

对于加锁操作,可以让所有客户端都去 / lock 目录下创建临时顺序节点,如果客户端发现自身创建节点序列号是 / lock / 目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。对于解锁操作,只需要将自身创建的节点删除即可,然后唤醒自己的后一个节点。

特点:利用临时顺序节点来实现分布式锁机制其实就是一种按照创建顺序排队的实现。这种方案效率高,避免了 “惊群” 效应,多个客户端共同等待锁,当锁释放时只有一个客户端会被唤醒。

实现步骤

  • 客户端连接 zookeeper,并在 / lock 下创建临时的且有序的子节点,第一个客户端对应的子节点为 / lock/lock-0000000000,第二个为 / lock/lock-0000000001,以此类推。
  • 客户端获取 / lock 下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁;
  • 执行业务代码;
  • 完成业务流程后,删除对应的子节点释放锁。

15542776154355

实现方式一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class ZooKeeperDistributedLock implements Watcher {

private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;

public ZooKeeperDistributedLock(String productId) {
this.productId = productId;
try {
String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}

public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
return;
}

if (this.latch != null) {
this.latch.countDown();
}
}

public void acquireDistributedLock() {
try {
if (this.tryLock()) {
return;
} else {
waitForLock(waitNode, sessionTimeout);
}
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
}

public boolean tryLock() {
try {
// 传入进去的 locksRoot + "/" + productId
// 假设 productId 代表了一个商品 id, 比如说 1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 看看刚创建的节点是不是最小的节点
// locks:10000000000,10000000001,10000000002
List<String> locks = zk.getChildren(locksRoot, false);
Collections.sort(locks);

if (lockNode.equals(locksRoot + "/" + locks.get(0))) {
// 如果是最小的节点, 则表示取得锁
return true;
}

// 如果不是最小的节点, 找到比自己小 1 的节点
int previousLockIndex = -1;
for (int i = 0; i < locks.size(); i++) {
if (lockNode.equals(locksRoot + "/" + locks.get(i))) {
previousLockIndex = i - 1;
break;
}
}

this.waitNode = locks.get(previousLockIndex);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return false;
}

private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}

public void unlock() {
try {
// 删除 / locks/10000000001 节点
System.out.println("unlock" + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}

实现方式二

虽然 Zookeeper 原生客户端暴露的 API 已经非常简洁了,但是实现一个分布式锁还是比较麻烦的。我们可以直接使用 curator 这个开源项目提供的 zookeeper 分布式锁实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class DistributedLock {

private static Logger log = LoggerFactory.getLogger(DistributedLock.class);
private InterProcessMutex interProcessMutex; // 可重入排它锁
private String lockName; // 竞争资源标志
private String root = "/distributed/lock/";// 根节点
private static CuratorFramework curatorFramework;
private static String ZK_URL = "zookeeper1.tq.master.cn:2181,zookeeper3.tq.master.cn:2181,zookeeper2.tq.master.cn:2181,zookeeper4.tq.master.cn:2181,zookeeper5.tq.master.cn:2181";

static {
curatorFramework = CuratorFrameworkFactory.newClient(ZK_URL, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
}

/**
* 实例化
*
* @param lockName
*/
public DistributedLock(String lockName) {
try {
this.lockName = lockName;
interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName);
} catch (Exception e) {
log.error("initial InterProcessMutex exception=" + e);
}
}

/**
* 获取锁
*/
public void acquireLock() {
int flag = 0;
try {
// 重试 2 次,每次最大等待 2s,也就是最大等待 4s
while (!interProcessMutex.acquire(2, TimeUnit.SECONDS)) {
flag++;
if (flag > 1) { // 重试两次
break;
}
}
} catch (Exception e) {
log.error("distributed lock acquire exception=" + e);
}
if (flag > 1) {
log.info("Thread:" + Thread.currentThread().getId() + "acquire distributed lock busy");
} else {
log.info("Thread:" + Thread.currentThread().getId() + "acquire distributed lock success");
}
}

/**
* 释放锁
*/
public void releaseLock() {
try {
if (interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()) {
interProcessMutex.release();
curatorFramework.delete().inBackground().forPath(root + lockName);
log.info("Thread:" + Thread.currentThread().getId() + "release distributed lock success");
}
} catch (Exception e) {
log.info("Thread:" + Thread.currentThread().getId() + "release distributed lock exception=" + e);
}
}
}

基于 MySQL 实现分布式锁

基于数据库表

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。当我们要锁住某个方法或资源的时候,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。获得锁时向表中插入一条记录,释放锁时删除这条记录。唯一索引可以保证该记录只被插入一次,那么就可以用这个记录是否存在来判断是否存于锁定状态。

创建这样一张数据库表:

1
2
3
4
5
6
7
8
CREATE TABLE `method_lock` (
`id` INT (11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` VARCHAR (64) NOT NULL DEFAULT ''COMMENT'锁定的方法名',
`desc` VARCHAR (1024) NOT NULL DEFAULT '备注信息',
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间, 自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '锁定中的方法';

a. 当我们要锁住某个方法时,执行以下 SQL:

1
INSERT INTO method_lock (method_name, DESC) VALUES (‘method_name’, ‘desc’);

因为我们对 method_name 做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们可以认为操作成功的那个线程获得了该方法的锁,可以执行具体内容。

b. 当方法执行完毕之后,想要释放锁的话,需要执行以下 sql:

1
DELETE FROM method_lock WHERE method_name = 'method_name';

上面这种简单的实现有以下几个问题:
1、这把锁依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、这把锁没有失效时间,一旦解决操作失败,就会导致记录一直在数据库中,其他线程无法在获得锁。
3、这把锁只能是非阻塞的,因为数据的 insert 操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁的操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据库表中数据已经存在了。

基于数据库表做乐观锁

大多数是基于数据版本(version)的记录机制实现的。何谓数据版本号?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表添加一个 “version” 字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加 1。在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败。

假设我们有一张资源表,状态(1:未分配;2:已分配)、资源创建时间、资源更新时间、资源数据版本号。

15536934001599

那么如果使用乐观锁如何解决问题呢?

a. 先执行 select 操作查询当前数据的数据版本号,比如当前数据版本号是 26:

1
SELECT id, resource, state, version FROM t_resource WHERE state = 1 AND id = 5780;

b. 执行更新操作:

1
UPDATE t_resoure SET state = 2, version = 27, update_time = now() WHERE resource = xxxxxx AND state = 1 AND version = 26;

c. 如果上述 update 语句真正更新影响到了一行数据,那就说明占位成功。如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。

基于数据库表做乐观锁的一些缺点:
1、这种操作方式,使原本一次的 update 操作,必须变为 2 次操作:select 版本号一次;update 一次。增加了数据库操作的次数。
2、如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于数据库资源表的乐观锁,就要让每个资源都有一张资源表,这个在实际使用场景中肯定是无法满足的。而且这些都基于数据库操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的。
3、乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,而不是将数据库表直接对外公开。

基于数据库表做悲观锁

利用 for update 加显式的行锁,这样就能利用这个行级的排他锁来实现分布式锁了,同时 unlock 的时候只要释放 commit 这个事务,就能达到释放锁的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 超时获取锁
*
* @param lockID
* @param timeOuts
* @return boolean
* @throws InterruptedException
*/
public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException {
String sql = "SELECT id from test_lock where id = ? for UPDATE";
long futureTime = System.currentTimeMillis() + timeOuts;
long ranmain = timeOuts;
long timerange = 500;
connection.setAutoCommit(false);
while (true) {
CountDownLatch latch = new CountDownLatch(1);
try {
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
boolean ifsucess = statement.execute();// 如果成功,那么就是获取到了锁
if (ifsucess)
return true;
} catch (SQLException e) {
e.printStackTrace();
}
latch.await(timerange, TimeUnit.MILLISECONDS);
ranmain = futureTime - System.currentTimeMillis();
if (ranmain <= 0)
break;
if (ranmain < timerange) {
timerange = ranmain;
}
continue;
}
return false;

}

/**
* 释放锁
*
* @param lockID
* @throws SQLException
*/
public void unlockforUpdtate(String lockID) throws SQLException {
connection.commit();
}

优点:实现简单
缺点:连接池爆满和事务超时的问题单点的问题,单点问题,行锁升级为表锁的问题,并发量大的时候请求量太大、没有线程唤醒机制。
适用场景:并发量略高于上面使用乐观锁的情况下,可以采用这种方法。

总结:不论如何,使用 Mysql 来实现分布式锁都不推荐,其性能,可靠性,以及实现上跟其它两种方式对比均没啥优势,因此,学习 Mysql 实现分布式锁可以仅仅作为一种了解和思想升华。


参考博文

[1]. Redlock:Redis 分布式锁最牛逼的实现
[2]. 一般实现分布式锁都有哪些方式?使用 redis 如何设计分布式锁?使用 zk 来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高?


分布式设计之美系列


谢谢你长得那么好看,还打赏我!😘
0%