Fork me on GitHub
ookamiAntD's Blog

分布式锁的几种实现方式

Preface

在现代互联网, 通常都是伴随着分布式、高并发等, 在某些业务中例如下订单扣减库存, 如果不对库存资源做临界处理, 在并发量大的时候会出现库存不准确的情况. 在单个服务的情况下可以通过Java自带的一些锁对临界资源进行处理, 例如synchronizedReentrantlock, 甚至是通过无锁技术(比如RangeBuffer)都可以实现同一个JVM内的锁. But, 在能够弹性伸缩的分布式环境下, Java内置的锁显然不能够满足需求, 需要借助外部进程实现分布式锁.

几种实现方式

分布式环境下, 数据一致性问题一直是一个比较重要的话题, 而又不同于单进程的情况. 分布式与单机情况下最大的不同在于其不是多线程而是多进程. 多线程由于可以共享堆内存, 因此可以简单的采取内存作为标记存储位置. 而进程之间甚至可能都不在同一台物理机上, 因此需要将标记存储在一个所有进程都能看到的地方.

常见的是秒杀场景, 订单服务部署了多个实例. 如秒杀商品有4个, 第一个用户购买3个, 第二个用户购买2个, 理想状态下第一个用户能购买成功, 第二个用户提示购买失败, 反之亦可. 而实际可能出现的情况是, 两个用户都得到库存为4, 第一个用户买到了3个, 更新库存之前, 第二个用户下了2个商品的订单, 更新库存为2, 导致出错.

在上面的场景中, 商品的库存是共享变量, 面对高并发情形, 需要保证对资源的访问互斥. 在单机环境中, Java中其实提供了很多并发处理相关的API, 但是这些API在分布式场景中就无能为力了. 也就是说单纯的Java API并不能提供分布式锁的能力. 分布式系统中, 由于分布式系统的分布性, 即多线程和多进程并且分布在不同机器中, synchronizedlock这两种锁将失去原有锁的效果, 需要我们自己实现分布式锁.

常见的锁方案如下:

  • 基于数据库实现分布式锁(基本用来玩的)
  • 基于缓存, 实现分布式锁, 如Redis(业界常用方式)
  • 基于Zookeeper实现分布式锁(性能低)

下面我们简单介绍下这几种锁的实现.

基于数据库

虽然这种方式基本上不会被用于生产环境

基于数据库的锁实现也有两种方式, 一是基于数据库表, 另一种是基于数据库排他锁.

基于数据库表的增删

基于数据库表增删是最简单的方式, 首先创建一张锁的表主要包含下列字段: 方法名, 时间戳等字段.

具体使用的方法, 当需要锁住某个方法时, 往该表中插入一条相关的记录. 这边需要注意, 方法名是有唯一性约束的, 如果有多个请求同时提交到数据库的话, 数据库会保证只有一个操作可以成功, 那么我们就可以认为操作成功的那个线程获得了该方法的锁, 可以执行方法体内容.

执行完毕, 需要delete该记录.

当然, 这边只是简单介绍一下. 对于上述方案可以进行优化, 如应用主从数据库, 数据之间双向同步. 一旦挂掉快速切换到备库上;做一个定时任务, 每隔一定时间把数据库中的超时数据清理一遍;使用while循环, 直到insert成功再返回成功, 虽然并不推荐这样做;还可以记录当前获得锁的机器的主机信息和线程信息, 那么下次再获取锁的时候先查询数据库, 如果当前机器的主机信息和线程信息在数据库可以查到的话, 直接把锁分配给他就可以了, 实现可重入锁.

  • 可重入锁: 可以再次进入方法A, 就是说在释放锁前此线程可以再次进入方法A(方法A递归).
  • 不可重入锁(自旋锁): 不可以再次进入方法A, 也就是说获得锁进入方法A是此线程在释放锁钱唯一的一次进入方法A.

基于数据库排他锁

我们还可以通过数据库的排他锁来实现分布式锁. 基于MySql的InnoDB引擎, 可以使用以下方法来实现加锁操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void lock(){
connection.setAutoCommit(false)
int count = 0;
while(count < 4){
try{
select * from lock where lock_name=xxx for update;
if(结果不为空){
//代表获取到锁
return;
}
}catch(Exception e){

}
//为空或者抛异常的话都表示没有获取到锁
sleep(1000);
count++;
}
throw new LockException();
}

在查询语句后面增加for update, 数据库会在查询过程中给数据库表增加排他锁. 当某条记录被加上排他锁之后, 其他线程无法再在该行记录上增加排他锁. 其他没有获取到锁的就会阻塞在上述select语句上, 可能的结果有2种, 在超时之前获取到了锁, 在超时之前仍未获取到锁.

获得排它锁的线程即可获得分布式锁, 当获取到锁之后, 可以执行方法的业务逻辑, 执行完方法之后, 释放锁connection.commit().

存在的问题主要是性能不高和sql超时的异常.

基于数据库锁的优缺点

上面两种方式都是依赖数据库的一张表, 一种是通过表中的记录的存在情况确定当前是否有锁存在, 另外一种是通过数据库的排他锁来实现分布式锁.

  • 优点是直接借助数据库, 简单容易理解.
  • 缺点是操作数据库需要一定的开销, 性能问题需要考虑.

基于Zookeeper

基于Zookeeper临时有序节点可以实现的分布式锁. 每个客户端对某个方法加锁时, 在Zookeeper上的与该方法对应的指定节点的目录下, 生成一个唯一的瞬时有序节点. 判断是否获取锁的方式很简单, 只需要判断有序节点中序号最小的一个. 当释放锁的时候, 只需将这个瞬时节点删除即可. 同时, 其可以避免服务宕机导致的锁无法释放, 而产生的死锁问题.

提供的第三方库有curator, 具体使用读者可以自行去看一下. Curator提供的InterProcessMutex是分布式锁的实现. acquire方法获取锁, release方法释放锁. 另外, 锁释放、阻塞锁、可重入锁等问题都可以有有效解决. 讲下阻塞锁的实现, 客户端可以通过在ZK中创建顺序节点, 并且在节点上绑定监听器, 一旦节点有变化, Zookeeper会通知客户端, 客户端可以检查自己创建的节点是不是当前所有节点中序号最小的, 如果是就获取到锁, 便可以执行业务逻辑.

根据Zookeeper的这些特性, 我们来看看如何利用这些特性来实现分布式锁:

  • 创建一个锁目录lock
  • 线程A获取锁会在lock目录下, 创建临时顺序节点
  • 获取锁目录下所有的子节点, 然后获取比自己小的兄弟节点, 如果不存在, 则说明当前线程顺序号最小, 获得锁
  • 线程B创建临时节点并获取所有兄弟节点, 判断自己不是最小节点, 设置监听(watcher)比自己次小的节点
  • 线程A处理完, 删除自己的节点, 线程B监听到变更事件, 判断自己是最小的节点, 获得锁

最后, Zookeeper实现的分布式锁其实存在一个缺点, 那就是性能上可能并没有缓存服务那么高. 因为每次在创建锁和释放锁的过程中, 都要动态创建、销毁瞬时节点来实现锁功能. ZK中创建和删除节点只能通过Leader服务器来执行, 然后将数据同不到所有的Follower机器上. 并发问题, 可能存在网络抖动, 客户端和ZK集群的session连接断了, zk集群以为客户端挂了, 就会删除临时节点, 这时候其他客户端就可以获取到分布式锁了.

下面是简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class CuratorTest {
private static String address = "127.0.0.1:2181";


public static void main(String[] args) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
client.start();
//创建分布式锁, 锁空间的根节点路径为/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
CompletionService<Object> completionService = new ExecutorCompletionService<>(fixedThreadPool);
for (int i = 0; i < 5; i++) {
completionService.submit(() -> {
boolean flag = false;
try {
//尝试获取锁, 最多等待5秒
flag = mutex.acquire(5, TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
if (flag) {
System.out.println("线程" + currentThread.getId() + "获取锁成功");
} else {
System.out.println("线程" + currentThread.getId() + "获取锁失败");
}
//模拟业务逻辑, 延时4秒
Thread.sleep(4000);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (flag) {
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
return null;
});
}
// 等待线程跑完
int count = 0;
while (count < 5) {
if (completionService.poll() != null) {
count++;
}
}
System.out.println("========= Complete!");
client.close();
fixedThreadPool.shutdown();
}
}

基于缓存

相对于基于数据库实现分布式锁的方案来说, 基于缓存来实现在性能方面会表现的更好一点, 存取速度快很多. 而且很多缓存是可以集群部署的, 可以解决单点问题. 基于缓存的锁有好几种, 如Memcached、Redis, 下面主要讲解基于Redis的分布式实现.

基于Redis的分布式锁实现

首先, 为了确保分布式锁可用, 我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性. 在任意时刻, 只有一个客户端能持有锁.
  2. 不会发生死锁. 即使有一个客户端在持有锁的期间崩溃而没有主动解锁, 也能保证后续其他客户端能加锁.
  3. 具有容错性. 只要大部分的Redis节点正常运行, 客户端就可以加锁和解锁.
  4. 解铃还须系铃人. 加锁和解锁必须是同一个客户端, 客户端自己不能把别人加的锁给解了.

基于Spring Data Redis

下面是正确的实现姿势. (使用Spring Data Redis)

依赖

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

加锁姿势

1
2
3
4
5
6
7
8
9
@Autowired
private StringRedisTemplate stringRedisTemplate;

private Boolean setNxEx(String key, String value) {
return stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> {
StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
return stringRedisConn.set(key, value, Expiration.from(1L, TimeUnit.MINUTES), SET_IF_ABSENT);
});
}

执行上面的setNxEx()方法就只会导致两种结果:

  1. 当前没有锁(key不存在), 那么就进行加锁操作, 并对锁设置个有效期, 同时value表示加锁的客户端.
  2. 已有锁存在, 不做任何操作.

网上有许多教程在加锁的步骤都不是原子性的, 有些是先加锁, 成功后再设置过期时间;有些将过期时间设置为value, 获取锁失败会判断value是否小于当前时间, 是则删除在设置新的值. 这些方法由于不是原子性, 在极端情况(比如多线程, 或者代码执行到某一行就宕机了等等)必然会导致锁失效或死锁等情况…

在上面stringRedisConn.set(...)方法中, 确保了上锁与设置过期时间的原子性.

解锁姿势

配置类:

1
2
3
4
5
6
7
8
@Bean
public RedisScript<Boolean> releaseLockScript(DLockConfigProperty dLockConfigProperty) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
String scriptLocation = "scripts/release_lock.lua";
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(scriptLocation)));
redisScript.setResultType(Boolean.class);
return redisScript;
}

Lua脚本:

1
2
3
4
5
if redis.call('GET', KEYS[1]) == ARGV[1] then
return 1 == redis.call('DEL', KEYS[1])
else
return false
end

核心代码:

1
2
3
4
5
6
7
8
9
@Resource
private StringRedisTemplate stringRedisTemplate;

@Resource
private RedisScript<Boolean> script;

public void release(String key, String value) {
stringRedisTemplate.execute(script, singletonList(key), value)
}

除了配置, 解锁就一行代码搞定, 虽然简洁, 里面也是有很多学问滴. . .

为什么要用Lua脚本?确保原子性, 如何保证, 请看官网对eval命令的相关解释. 上面脚本表达的意思很简单, 对比传进来的value是否相等, 是则删除锁. value可使用UUID作为当前线程的标识符, 只有但前线程才能解锁.

网上的错误姿势一般都是执行完业务代码直接删除锁, 这样会导致删除了其他线程获的锁.

上面实现的分布式锁是不支持可重入的, 需要额外的编码, 业界当然早就开源了类似的框架, 比如下面介绍的Redisson.

基于Redisson

Redisson 是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid). 它不仅提供了一系列的分布式的Java常用对象, 还提供了许多分布式服务. 其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法. Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern), 从而让使用者能够将精力更集中地放在处理业务逻辑上.

Redisson提供的众多功能中有一项就是可重入锁(Reentrant Lock), 具体用法可参考 文档

依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.5</version>
</dependency>

核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Data
@Slf4j
public class RedissonDLock implements DLock {

private final Long waitTime;
private final Long leaseTime;
private final TimeUnit timeUnit;
private final RedissonClient redisson;

public RedissonDLock(DLockConfigProperty property) {
// 设置一些基本属性
this.waitTime = property.getWaitTime();
this.leaseTime = property.getLeaseTime();
this.timeUnit = property.getTimeUnit();

Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://" + property.getHost() + ":" + property.getPort());
if (property.getPassword() != null && property.getPassword().trim().length() > 0) {
singleServerConfig.setPassword(property.getPassword());
}
try {
Class.forName("io.netty.channel.epoll.Epoll");
// 如果是Linux系统可采用Epoll算法, 需要引入 netty-transport-native-epoll
if (Epoll.isAvailable()) {
config.setTransportMode(TransportMode.EPOLL);
log.info("Starting with optional epoll library");
} else {
log.info("Starting without optional epoll library");
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
redisson = Redisson.create(config);
}

@Override
public void tryLockAndAction(LockKeyGenerator lockKeyGenerator, AfterAcquireAction acquireAction) {
tryLockAndAction(lockKeyGenerator, acquireAction, waitTime, leaseTime, timeUnit);
}

@Override
public void tryLockAndAction(LockKeyGenerator lockKeyGenerator, AfterAcquireAction acquireAction, Long waitTime, Long leaseTime, TimeUnit timeUnit) {
tryLockAndAction(lockKeyGenerator, acquireAction, DEFAULT_FAIL_ACQUIRE_ACTION, waitTime, leaseTime, timeUnit);
}

@Override
public void tryLockAndAction(LockKeyGenerator lockKeyGenerator, AfterAcquireAction acquireAction, FailAcquireAction failAcquireAction, Long waitTime, Long leaseTime, TimeUnit timeUnit) {
try (LockHolder holder = new LockHolder(redisson.getLock(lockKeyGenerator.getLockKey()))) {
boolean acquire = holder.getLock().tryLock(waitTime, leaseTime, timeUnit);
if (acquire) {
acquireAction.doAction();
} else {
failAcquireAction.doOnFail();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public <T> T tryLockAndExecuteCommand(LockKeyGenerator lockKeyGenerator, AfterAcquireCommand<T> command, FailAcquireAction failAcquireAction, Long waitTime, Long leaseTime, TimeUnit timeUnit) throws Throwable {
try (LockHolder holder = new LockHolder(redisson.getLock(lockKeyGenerator.getLockKey()))) {
boolean acquire = holder.getLock().tryLock(waitTime, leaseTime, timeUnit);
if (acquire) {
return command.executeCommand();
}
failAcquireAction.doOnFail();
}
return null;
}

@Data
@Accessors(chain = true)
@AllArgsConstructor
private static class LockHolder implements AutoCloseable {
private RLock lock;

@Override
public void close() {
lock.unlockAsync();
}
}
}
  • 一般服务器都是Linux系统, 引入io.netty.channel.epoll.Epoll采用Epoll方式有助于提升性能
  • 使用try-with-resource方式提高代码优雅性…

注解驱动

Lock注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Lock {

String namespace() default "default";

String key();

Class<?> prefixClass();

String separator() default ":";

long waitTime() default 2L;

long leaseTime() default 5L;

TimeUnit timeUnit() default TimeUnit.SECONDS;
}

切面类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Component
@Aspect
@Order(1)
public class DLockAspect {
@Resource
private DLock dLock;

@Value("${spring.application.name}")
private String namespace;

@Around(value = "@annotation(lock)")
public Object doAround(ProceedingJoinPoint pjp, Lock lock) throws Throwable {
Method method = ((MethodSignature) pjp.getSignature()).getMethod();

Object[] args = pjp.getArgs();
String keySpEL = lock.key();
String resourceKey = parseSpel(method, args, keySpEL, String.class);

String finalKey = buildFinalKey(lock, resourceKey);
return dLock.tryLockAndExecuteCommand(() -> finalKey, () -> pjp.proceed(pjp.getArgs()), DEFAULT_FAIL_ACQUIRE_ACTION,
lock.waitTime(), lock.leaseTime(), lock.timeUnit());
}

private String buildFinalKey(Lock lock, String key) {
return namespace == null || namespace.length() == 0 ? lock.namespace() : namespace +
lock.separator() +
lock.prefixClass().getSimpleName() +
lock.separator() +
key;
}
}

使用了 SpEL 解析锁的Key:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class SpelHelper {
private static final ExpressionParser PARSER = new SpelExpressionParser();
private static final LocalVariableTableParameterNameDiscoverer DISCOVERER = new LocalVariableTableParameterNameDiscoverer();

public static <T> T parseSpel(Method method, Object[] args, String spel, Class<T> clazz) {
String[] parameterNames = DISCOVERER.getParameterNames(method);
requireNonNull(parameterNames);
EvaluationContext context = buildSpelContext(parameterNames, args);
Expression expression = PARSER.parseExpression(spel);
return expression.getValue(context, clazz);
}

private static EvaluationContext buildSpelContext(String[] parameterNames, Object[] args) {
EvaluationContext context = new StandardEvaluationContext();
for (int len = 0; len < parameterNames.length; len++) {
context.setVariable(parameterNames[len], args[len]);
}
context.setVariable("args", args);
return context;
}
}

使用:

1
2
3
4
5
// @Lock(prefixClass = TestService.class, key = "#id")
@Lock(prefixClass = TestService.class, key = "#args[0]")
public void lockTest(Long id) {
doSomething();
}

如果锁被早被别的线程使用, 一般我们使用线程Sleep的方式等待锁释放, 但Redisson的底层采用了更优雅的等待策略, 通过发布订阅通知其他线程, 所以性能也会有所提高.

Finally

Redisson官方文档: https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

示例代码: https://github.com/masteranthoneyd/starter/tree/master/dlock

---------------- The End ----------------
ookamiAntD wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
谢谢大爷~

Author:ookamiAntD Yang
Link:http://yangbingdong.com/2018/distribution-lock/
Contact:yangbingdong1994@gmail.com
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布
转载请注明出处,谢谢!

分享到: