分布式锁其实可以理解为:控制分布式系统有序的去对共享资源进行操作,通过互斥来保持一致性。
- 基于数据库实现分布式锁
- 基于缓存(Redis,memcached,tair)实现分布式锁
- 基于 Zookeeper 实现分布式锁
分布式锁应该是怎么样的?(这里以方法锁为例,资源锁同理)
- 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
- 这把锁要是一把可重入锁(避免死锁)
- 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
- 有高可用的获取锁和释放锁功能
- 获取锁和释放锁的性能要好
什么是分布式锁?
- 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
- 与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(我觉得分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠。。。一个大坑)
- 分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。
需要怎样的分布式锁?
- 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器-上的一个线程执行。
- 这把锁要是一把可重入锁(避免死锁)
- 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
- 这把锁最好是一把公平锁(根据业务需求考虑要不要这条)
- 有高可用的获取锁和释放锁功能
- 获取锁和释放锁的性能要好
数据库实现
基于乐观锁
基于唯一索引 (eg 表主键)
**思路:**利用主键唯一的特性,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,当方法执行完毕之后,想要释放锁的话,删除这条数据库记录即可。
上面这种简单的实现有以下几个问题:
- 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
- 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
- 这把锁只能是非阻塞的,因为数据的 insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
- 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
- 这把锁是非公平锁,所有等待锁的线程凭运气去争夺锁。
- 在 MySQL 数据库中采用主键冲突防重,在大并发情况下有可能会造成锁表现象。
当然,我们也可以有其他方式解决上面的问题。
- 数据库是单点?搞两个数据库,数据之前双向同步,一旦挂掉快速切换到备库上。
- 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
- 非阻塞的?搞一个 while 循环,直到 insert 成功再返回成功。
- 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
- 非公平的?再建一张中间表,将等待锁的线程全记录下来,并根据创建时间排序,只有最先创建的允许获取锁。
- 比较好的办法是在程序中生产主键进行防重。
将获取锁通过插入唯一索引来实现,释放锁则通过删除改唯一索引记录实现。
这种实现方式在实际业务中有变化的应用,一般实际业务会通过插入唯一索引获取锁,之后进行正常的业务处理,这个锁记录同时也是业务的一部分,因此不再执行释放锁的操作。比如在结算时,财务入账时会采用这种方式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public void lock(){
int result =0
try{
result = execute("insert into lock values(uniqueid)")
if(result >0){
//获取到锁
//开始业务处理
return;
}
}catch(Exception e){
}
// 未获取到锁
//业务决定是否需要等待以重新获取锁
}
public void unlock(){
execute("delete from lock_table where uniqueid=id")
}
|
基于表字段版本号
这个策略源于 mysql 的 mvcc 机制,使用这个策略其实本身没有什么问题,唯一的问题就是对数据表侵入较大,我们要为每个表设计一个版本号字段,然后写一条判断 sql 每次进行判断,增加了数据库操作的次数,在高并发的要求下,对数据库连接的开销也是无法忍受的。
悲观锁实现
使用MySQL的InnoDB的排他锁来实现加锁,通过释放链接的方式释放锁。
在查询语句后面增加for update
,数据库会在查询过程中给数据库表增加排他锁 (注意: InnoDB 引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给要执行的方法字段名添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
我们可以认为获得排他锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,通过connection.commit()
操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。
- 阻塞锁?
for update
语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
- 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。
但是还是无法直接解决数据库单点和可重入问题。
这里还可能存在另外一个问题,虽然我们对方法字段名使用了唯一索引,并且显示使用 for update 来使用行级锁。但是,MySQL 会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。
还有一个问题,就是我们要使用排他锁来进行分布式锁的 lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public void lock(){
connection.setAutoCommit(false)
try{
select * from lock where lock_name=xxx for update;
if(结果不为空){
//代表获取到锁
return;
}
}catch(Exception e){
}
//为空或者抛异常的话都表示没有获取到锁
sleep(1000);
count++;
}
throw new LockException();
}
public void unlock(){
connection.commit();
}
|
数据库的实现方式,性能不高,在获取锁和释放锁时都可能因为数据库异常而出现死锁,为避免出现死锁需要增加表设计的复杂度,如设置锁的超时时间,并需要有job来保证锁超时之后能够正确释放,实现成本相对较高。
通过connection.commit()
操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。
- 阻塞锁?
for update
语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
- 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。
但是还是无法直接解决数据库单点和可重入问题。
总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。
分布式缓存实现
redis和memcached是目前应用最广泛的分布式缓存,其中一些命令可用于实现分布式锁。
-
memecached
add() —— 在新增缓存时,如果key已经存在则调用失败
cas() —— 类似数据库中的乐观锁,通过比较key对应value的变化来检测是否获取到锁
-
redis
setnx() —— 设置key对应的value,如果该key已经存在,则设置失败。expire() 缓存失效。
因为在我们的生产环境中主要使用redis,因此在这里只介绍redis的实现方式。常规使用方式:
这种方式存在无法失效,但是当一个客户端获取到锁之后挂掉了就无法即使释放锁,会导致死锁的情况。因此现在主流的方式是为lock_key 设置一个过期时间,在读取key的时实时判断缓存是否过期。
redis官方也推荐了 Redlock 的分布式锁实现方案,不过目前针对其中的算法还有争论,在线上也没有出现大规模使用,在这里不做过多讨论;
优点
性能出色,实现相对简单。
缺点
- redis是内存数据库,虽然redis自身有AOF和RDB的数据恢复机制,并自带复制功能,但在出现宕机的情况下,锁数据很难保证。
- 通过锁超时时间设置来保证锁的最后释放,这要求client在获取锁之后必须在超时时间内完成业务处理,否则超时之后会出现并发问题;且redis是分布式缓存,超时时间还需要考虑网络时间消耗。
- redis单机情况下,存在redis单点故障的问题。如果为了解决单点故障而使用redis的sentinel或者cluster方案,则更加复杂,引入的问题更多。
可以使用缓存来代替数据库来实现分布式锁,这个可以提供更好的性能,同时,很多缓存服务都是集群部署的,可以避免单点问题。并且很多缓存服务都提供了可以用来实现分布式锁的方法,比如Tair的put方法,redis的setnx方法等。并且,这些缓存服务也都提供了对数据的过期自动删除的支持,可以直接设置超时时间来控制锁的释放。
使用缓存实现分布式锁的优点
性能好,实现起来较为方便。
使用缓存实现分布式锁的缺点
通过超时时间来控制锁的失效时间并不是十分的靠谱。
基于ZooKeeper实现
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。
它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的架构通过冗余服务实现高可用性。因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机。ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。更新是全序的。
zookeeper实现了类似paxos协议,是一个拥有多个节点分布式协调服务。对zookeeper写入请求会转发到leader,leader写入完成,并同步到其他节点,直到所有节点都写入完成,才返回客户端写入成功。
zookeeper一下特点使其非常适合用于实现分布式锁:
- 支持watcher机制,通过watch锁数据来实现锁,采用删除数据的方式来释放锁,删除数据时可以通知到其他client;
- 支持临时节点,如果客户端获取到锁之后出现异常死机,临时节点会被删除,从而释放锁,无需通过设置超时时间的方式来避免死锁。
zookeeper实现锁的方式是客户端一起竞争写某条数据,比如/path/lock,只有第一个客户端能写入成功,其他的客户端都会写入失败。写入成功的客户端就获得了锁,写入失败的客户端,注册watch事件,等待锁的释放,从而继续竞争该锁。
Zookeeper有一个监听机制,客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)等,Zookeeper会通知客户端。
大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。
判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。
当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
来看下Zookeeper能不能解决前面提到的问题。
- 锁无法释放?使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
- 非阻塞锁?使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
- 不可重入?使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
- 单点问题?使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。
可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
try {
return interProcessMutex.acquire(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public boolean unlock() {
try {
interProcessMutex.release();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
}
return true;
}
|
Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。
使用ZK实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
使用Zookeeper实现分布式锁的优点
有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
使用Zookeeper实现分布式锁的缺点
性能上不如使用缓存实现分布式锁。
需要对ZK的原理有所了解。
ZooKeeper 锁相关基础知识
- zk 一般由多个节点构成(单数),采用 zab 一致性协议。因此可以将 zk 看成一个单点结构,对其修改数据其内部自动将所有节点数据进行修改而后才提供查询服务。
- zk 的数据以目录树的形式,每个目录称为 znode, znode 中可存储数据(一般不超过 1M),还可以在其中增加子节点。
- 子节点有三种类型。序列化节点,每在该节点下增加一个节点自动给该节点的名称上自增。临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除。最后就是普通节点。
- Watch 机制,client 可以监控每个节点的变化,当产生变化会给 client 产生一个事件。
ZK 基本锁
- 原理:利用临时节点与 watch 机制。每个锁占用一个普通节点 /lock,当需要获取锁时在 /lock 目录下创建一个临时节点,创建成功则表示获取锁成功,失败则 watch/lock 节点,有删除操作后再去争锁。临时节点好处在于当进程挂掉后能自动上锁的节点自动删除即取消锁。
- 缺点:所有取锁失败的进程都监听父节点,很容易发生羊群效应,即当释放锁后所有等待进程一起来创建节点,并发量很大。
ZK 锁优化
- 原理:上锁改为创建临时有序节点,每个上锁的节点均能创建节点成功,只是其序号不同。只有序号最小的可以拥有锁,如果这个节点序号不是最小的则 watch 序号比本身小的前一个节点 (公平锁)。
步骤:
- 1.在 /lock 节点下创建一个有序临时节点 (EPHEMERAL_SEQUENTIAL)。
- 2.判断创建的节点序号是否最小,如果是最小则获取锁成功。不是则取锁失败,然后 watch 序号比本身小的前一个节点。
- 3.当取锁失败,设置 watch 后则等待 watch 事件到来后,再次判断是否序号最小。
- 4.取锁成功则执行代码,最后释放锁(删除该节点)。
基于ZooKeeper分布式锁的流程
- 在zookeeper指定节点(locks)下创建临时顺序节点node_n
- 获取locks下所有子节点children
- 对子节点按节点自增序号从小到大排序
- 判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
- 若监听事件生效,则回到第二步重新进行判断,直到获取到锁
优点:
有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
缺点:
性能上可能并没有缓存服务那么高,因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后将数据同步到所有的 Follower 机器上。还需要对 ZK的原理有所了解。
具体实现
- 通过实现Watch接口,实现process(WatchedEvent event)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。
- 以下整体流程基本与上述描述流程一致,只是在监听的时候使用的是CountDownLatch来监听前一个节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
public class DistributedLock implements Lock, Watcher {
private ZooKeeper zk = null;
// 根节点
private String ROOT_LOCK = "/locks";
// 竞争的资源
private String lockName;
// 等待的前一个锁
private String WAIT_LOCK;
// 当前锁
private String CURRENT_LOCK;
// 计数器
private CountDownLatch countDownLatch;
private int sessionTimeout = 30000;
private List<Exception> exceptionList = new ArrayList<Exception>();
/**
* 配置分布式锁
* @param config 连接的url
* @param lockName 竞争资源
*/
public DistributedLock(String config, String lockName) {
this.lockName = lockName;
try {
// 连接zookeeper
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(ROOT_LOCK, false);
if (stat == null) {
// 如果根节点不存在,则创建根节点
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
// 节点监视器
public void process(WatchedEvent event) {
if (this.countDownLatch != null) {
this.countDownLatch.countDown();
}
}
public void lock() {
if (exceptionList.size() > 0) {
throw new LockException(exceptionList.get(0));
}
try {
if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
return;
} else {
// 等待锁
waitForLock(WAIT_LOCK, sessionTimeout);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public boolean tryLock() {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new LockException("锁名有误");
}
// 创建临时有序节点
CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(CURRENT_LOCK + " 已经创建");
// 取所有子节点
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
// 取出所有lockName的锁
List<String> lockObjects = new ArrayList<String>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if (_node.equals(lockName)) {
lockObjects.add(node);
}
}
Collections.sort(lockObjects);
System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
// 若当前节点为最小节点,则获取锁成功
if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
return true;
}
// 若不是最小节点,则找到自己的前一个节点
String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
public boolean tryLock(long timeout, TimeUnit unit) {
try {
if (this.tryLock()) {
return true;
}
return waitForLock(WAIT_LOCK, timeout);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 等待锁
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
if (stat != null) {
System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev);
this.countDownLatch = new CountDownLatch(1);
// 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
System.out.println(Thread.currentThread().getName() + " 等到了锁");
}
return true;
}
public void unlock() {
try {
System.out.println("释放锁 " + CURRENT_LOCK);
zk.delete(CURRENT_LOCK, -1);
CURRENT_LOCK = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public Condition newCondition() {
return null;
}
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e){
super(e);
}
public LockException(Exception e){
super(e);
}
}
}
|
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class Test {
static int n = 500;
public static void secskill() {
System.out.println(--n);
}
public static void main(String[] args) {
Runnable runnable = new Runnable() {
public void run() {
DistributedLock lock = null;
try {
lock = new DistributedLock("127.0.0.1:2181", "test1");
lock.lock();
secskill();
System.out.println(Thread.currentThread().getName() + "正在运行");
} finally {
if (lock != null) {
lock.unlock();
}
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}
|
三种方案的比较
从理解的难易程度角度(从低到高)
数据库 > 缓存 > Zookeeper
从实现的复杂性角度(从低到高)
Zookeeper >= 缓存 > 数据库
从性能角度(从高到低)
缓存 > Zookeeper >= 数据库
从可靠性角度(从高到低)
Zookeeper > 缓存 > 数据库
etcd 实现分布式锁
etcd 是与zookeeper类似的高可用强一致性的服务发现仓库,使用key-value的存储方式。相对于zookeeper具有以下优点:
- 简单:使用Golang编写,部署更简单;使用HTTP 作为接口使用简单;使用Raft算法保证强一致性,便于理解。
- 数据持久化:默认数据一更新就进行持久化。
- 安全:支持SSL客户端安全认证。
Etcd Java 客户端 Jetcd 提供的 Lock 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 创建客户端,本例中Etcd服务端为单机模式
Client client = Client.builder().endpoints("http://localhost:2379").build();
// 创建Lock客户端
Lock lockClient = client.getLockClient();
// 创建Lease客户端,并创建一个有效期为30s的租约
Lease leaseClient = client.getLeaseClient();
long leaseId = leaseClient.grant(30).get().getID();
// 加、解锁操作
try
{
// 调用lock接口,加锁,并绑定租约
lockClient.lock(ByteSequence.fromString("lockName"), leaseId).get();
// 调用unlock接口,解锁
lockClient.unlock(ByteSequence.fromString(lockName)).get();
}
catch (InterruptedException | ExecutionException e1)
{
System.out.println("[error]: lock failed:" + e1);
}
|
第三方工具
- 基于
Redisson
组件,使用redlock算法实现
- 基于
Apache Curator
,利用Zookeeper的临时顺序节点模型实现
- 使用
Spring Integration
实现
Apache Curator

InterProcessMutex 分布式可重入排它锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public void interProcessMutexLock(CuratorFramework zkClient) {
InterProcessMutex interProcessMutex = new InterProcessMutex(zkClient, basePath);
try {
//interPRocessMutex.acquire(10,TimeUnit.HOURS) 获取锁。并设置超时时间
if (interProcessMutex.acquire(10, TimeUnit.HOURS)) {
try {
buyPhone(1);
} catch (Exception ex) {
System.out.println("异常...");
ex.printStackTrace();
}
}
// 重入锁
if(interProcessMutex.acquire(10, TimeUnit.HOURS)){
try {
buyPhone(1);
} catch (Exception ex) {
System.out.println("异常....");
ex.printStackTrace();
} finally {
//释放锁
interProcessMutex.release();
interProcessMutex.release();
System.out.println("释放锁....");
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//zkClient.close();
}
}
|
InterProcessReadWriteLock 分布式读写锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public void interProcessReadWriteLock(CuratorFramework zkClient){
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient,basePath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
/**
* 一个负责读操作,一个负责写操作。
* 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许其他进程读使用。
* 一个拥有写锁的线程可重入读锁,反之不行。
*/
try{
/*// 读锁
if(readLock.acquire(10, TimeUnit.HOURS)){
System.out.println("Thread Name is : " + Thread.currentThread().getName());
System.out.println("phone inventory : " + getPhoneInventory());
}
// 写锁重入会锁死
if(writeLock.acquire(10,TimeUnit.HOURS)){
System.out.println("Thread Name is :" + Thread.currentThread().getName());
buyPhone(1);
}*/
// 写锁
if(writeLock.acquire(10, TimeUnit.HOURS)){
System.out.println("写锁 Thread Name is : " + Thread.currentThread().getName());
buyPhone(1);
}
// 读锁重入
if(readLock.acquire(10,TimeUnit.HOURS)){
System.out.println("读锁 Thread Name is :" + Thread.currentThread().getName());
System.out.println("phone inventory : " + getPhoneInventory());
}
}catch (Exception e){
e.printStackTrace();
}finally{
try{
writeLock.release();
readLock.release();
}catch (Exception e){
e.printStackTrace();
}
}
}
|
Redisson
redisson 是 redis 官方的推荐分布式锁组件。GitHub 地址:https://github.com/redisson/redisson
失效时间设置多长时间为好?这个问题在 redisson 的做法是:每获得一个锁时,只设置一个很短的超时时间,同时起一个线程在每次快要到超时时间时去刷新锁的超时时间。在释放锁的同时结束这个线程。
基于 Redis 的分布式可重入锁
实现了 java.util.concurrent.locks.Lock
接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
RLock lock = redisson.getLock("myLock");
// traditional lock method
lock.lock();
// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
|
核心加锁方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//时间转化为毫秒值
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
|
- exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;
- 如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间
- 直接返回key的剩余过期时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
就是每次都对myLock数据结构中的那个加锁次数减1。如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从redis里删除这个key。
基于 Redlock
Redlock 是 Redis 的作者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个完全独立的 Redis 节点(通常情况下 N 可以设置成 5)。
算法的步骤如下:
- 1、客户端获取当前时间,以毫秒为单位。
- 2、客户端尝试获取 N 个节点的锁,(每个节点获取锁的方式和前面说的缓存锁一样),N 个节点以相同的 key 和 value 获取锁。客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,比如锁自动释放的时间是 10s,那么接口超时大概设置 5-50ms。这样可以在有 redis 节点宕机后,访问该节点时能尽快超时,而减小锁的正常使用。
- 3、客户端计算在获得锁的时候花费了多少时间,方法是用当前时间减去在步骤一获取的时间,只有客户端获得了超过 3 个节点的锁,而且获取锁的时间小于锁的超时时间,客户端才获得了分布式锁。
- 4、客户端获取的锁的时间为设置的锁超时时间减去步骤三计算出的获取锁花费时间。
- 5、如果客户端获取锁失败了,客户端会依次删除所有的锁。
使用 Redlock 算法,可以保证在挂掉最多 2 个节点的时候,分布式锁服务仍然能工作,这相比之前的数据库锁和缓存锁大大提高了可用性,由于 redis 的高效性能,分布式缓存锁性能并不比数据库锁差。
使用 redlock 算法实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);
// traditional lock method
redLock.lock();
// or acquire lock and automatically unlock it after 10 seconds
redLock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
redLock.unlock();
}
}
|
Spring Cloud Cluster 里面基于ETCD、hazelcast的分布式锁实现值得一看。
Spring Integration 提供的全局锁目前为如下存储提供了实现:
- Gemfire
- JDBC
- Redis
- Zookeeper
Spring Integration 实现分布式锁时需要关注的方法
方法名 |
描述 |
lock() |
Acquires the lock. 加锁,如果已经被其他线程锁住或者当前线程不能获取锁则阻塞 |
lockInterruptibly() |
Acquires the lock unless the current thread is interrupted. 加锁,除非当前线程被打断。 |
tryLock() |
Acquires the lock only if it is free at the time of invocation. 尝试加锁,如果已经有其他锁锁住,获取当前线程不能加锁,则返回false,加锁失败;加锁成功则返回true |
tryLock(long time, TimeUnit unit) |
Acquires the lock if it is free within the given waiting time and the current thread has not been interrupted. 尝试在指定时间内加锁,如果已经有其他锁锁住,获取当前线程不能加锁,则返回false,加锁失败;加锁成功则返回true |
unlock() |
Releases the lock. 解锁 |
Zookeeper Support
1
2
3
4
5
|
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zookeeper</artifactId>
<version>5.3.1.RELEASE</version>
</dependency>
|
Redis Support
强烈建议阅读一下 org.springframework.integration.redis.util.RedisLockRegistry
的注释,它详细描述了该类的特性,例如可重入性以及锁在Redis中如何存储等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
|
- 在application.yml中添加redis的配置
1
2
3
4
|
spring:
redis:
host: 172.31.0.149
port: 7111
|
- 建立配置类,注入
RedisLockRegistry
1
2
3
4
5
6
7
8
9
|
@Configuration
public class RedisLockConfiguration {
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory){
return new RedisLockRegistry(redisConnectionFactory, "redis-lock");
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@RestController
@RequestMapping("lock")
@Log4j2
public class DistributedLockController {
@Autowired
private RedisLockRegistry redisLockRegistry;
@GetMapping("/redis")
public void test1() {
Lock lock = redisLockRegistry.obtain("redis");
try{
//尝试在指定时间内加锁,如果已经有其他锁锁住,获取当前线程不能加锁,则返回false,加锁失败;加锁成功则返回true
if(lock.tryLock(3, TimeUnit.SECONDS)){
log.info("lock is ready");
TimeUnit.SECONDS.sleep(5);
}
} catch (InterruptedException e) {
log.error("obtain lock error",e);
} finally {
lock.unlock();
}
}
}
|
- 测试
启动多个实例,分别访问
/lock/redis
端点,一个正常执行业务逻辑,另外一个实例访问出现如下错误