Skip to main content
 首页 » 数据库

redis中的分布式锁

2022年07月19日146webabcd

分布式锁的实现场景

   在平时的开发中,对于高并发的开发场景,我们不可避免要加锁进行处理,当然redis中也是不可避免的,下面是我总结出来的几种锁的场景

Redis分布式锁方案一

   使用Redis实现分布式锁最简单的方案是在获取锁之前先查询一下以该锁为key对应的value存不存在,如果存在,则说明该锁被其他客户端获取了,否则的话就尝试获取锁,获取锁的方法很简单,只要以该锁为key,设置一个随机的值就行了。比如,我们现在有个秒杀的场景,并发量可能是3000,但是我们商品的库存数量是一定的,为了防止超卖,我们就需要在减库存的时候加上锁,当第一个请求过来的时候,先判断锁时候存在,不存在就加锁,然后去处理秒杀的业务,并且在处理完成的时候,释放锁,如果判断锁存在,就轮训等待锁被释放。

缺点

1、如果我们处理业务的时候报错了,那么加上的锁就不能及时被释放了,这时候我们需要加上一个异常的捕获,在finally的时候释放锁。

2、同时我们也要主要set写入key,是会出现覆盖操作的,我们要要注意使用setnx(只要锁被加上,后面的写入操作不会覆盖前面的写入)

2、但是,有可能我们在处理业务的时候,整个服务器、宕机了,那么异常的捕获显然是不管用了,这时候我们的这种设计显然是存在缺陷的。

 

Redis分布式锁方案二

   上一节的方案缺点就是锁有时候没有办法释放,造成死锁。那么对于setnx我们应该怎样处理呢?

   考虑一种情况,如果进程获得锁后,断开了与 Redis 的连接(可能是进程挂掉,或者网络中断),如果没有有效的释放锁的机制,那么其他进程都会处于一直等待的状态,即出现“死锁”。

   上面在使用 SETNX 获得锁时,我们将键 lock.foo 的值设置为锁的有效时间,进程获得锁后,其他进程还会不断的检测锁是否已超时,如果超时,那么等待的进程也将有机会获得锁。

   然而,锁超时时,我们不能简单地使用 DEL 命令删除键 lock.foo 以释放锁。考虑以下情况,进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。进程P2,P3正在不断地检测锁是否已释放或者已超时,执行流程如下:

 1、P2和P3进程读取键 lock.foo 的值,检测锁是否已超时(通过比较当前时间和键 lock.foo 的值来判断是否超时)

 2、P2和P3进程发现锁 lock.foo 已超时

 3、P2执行 DEL lock.foo命令

 4、P2执行 SETNX lock.foo命令,并返回1,即P2获得锁

 5、P3执行 DEL lock.foo命令将P2刚刚设置的键 lock.foo 删除(这步是由于P3刚才已检测到锁已超时)

 6、P3执行 SETNX lock.foo命令,并返回1,即P3获得锁

 7、P2和P3同时获得了锁

从上面的情况可以得知,在检测到锁超时后,进程不能直接简单地执行 DEL 删除键的操作以获得锁。

为了解决上述算法可能出现的多个进程同时获得锁的问题,我们再来看以下的算法。 

我们同样假设进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。接下来的情况:

 1、进程P4执行 SETNX lock.foo 以尝试获取锁

 2、由于进程P1已获得了锁,所以P4执行 SETNX lock.foo 返回0,即获取锁失败

 3、P4执行 GET lock.foo 来检测锁是否已超时,如果没超时,则等待一段时间,再次检测

 4、如果P4检测到锁已超时,即当前的时间大于键 lock.foo 的值,P4会执行以下操作 GETSET lock.foo <current Unix timestamp + lock timeout + 1>

 5、由于 GETSET 操作在设置键的值的同时,还会返回键的旧值,通过比较键 lock.foo 的旧值是否小于当前时间,可以判断进程是否已获得锁

 6、假如另一个进程P5也检测到锁已超时,并在P4之前执行了 GETSET 操作,那么P4的 GETSET 操作返回的是一个大于当前时间的时间戳,这样P4就不会获得锁而继续等待。注意到,即使P4接下来将键 lock.foo 的值设置了比P5设置的更大的值也没影响。

 另外,值得注意的是,在进程释放锁,即执行 DEL lock.foo 操作前,需要先判断锁是否已超时。如果锁已超时,那么锁可能已由其他进程获得,这时直接执行 DEL lock.foo 操作会导致把其他进程已获得的锁释放掉。

缺点

1、其实这种方案还存在缺陷,我们知道对于锁设置过期的时间,虽然可以解决锁的及时释放,但是我们知道我们需要处理的业务场景的时间的不可控,可能网络动荡,我们原来0.01秒的业务现在就需要3秒钟,所以简单的对锁设置过期的时间,还是存在缺陷的。

Redis分布式锁方案三

   那么对于方案二的场景的缺点我们应该怎么去处理呢?有一个简单的方法就是当给一个锁加上过期的时间的时候,我们另外开启一个线程,去监测业务处理的时间,如果锁的时间快到了,并且业务还没有执行完毕,就给锁的时间延长,实现自旋的加锁。

下面是解决方案(不过只处理到了方案二,自旋锁没完成)

// Lock 加锁  
func Lock(lockKey string) int64 { 
    redisConn := GetRedisHandle().RedisClientHandle 
 
    expire := int64(8e3) // 锁有效期 
    lt := 2000           // 获取锁时间 
    i := 0 
    sleep := 100 * time.Microsecond 
    for { 
        nowUnix := time.Now().UnixNano() / 1e6 // 取毫秒 
        nv := nowUnix + expire 
        s := redisConn.SetNX(lockKey, nv, 0).Val() 
        if s { 
            return nv 
        } 
 
        if lv := redisConn.Get(lockKey).Val(); lv != "" { 
            e := String2Int64(lv) 
            if e < nowUnix { 
                // 锁已超时 
 
                t := String2Int64(redisConn.GetSet(lockKey, nv).Val()) 
                if e == t { 
                    return nv 
                } 
            } 
        } 
 
        i += 100 
 
        if i >= lt { 
            // 不再一直等待 
 
            //return 0 
        } 
 
        time.Sleep(sleep) 
    } 
} 
 
// Unlock 解锁  
func Unlock(lk string, lv int64) bool { 
    redisConn := GetRedisHandle().RedisClientHandle 
 
    if ov := redisConn.Get(lk).Val(); ov != "" { 
        if ovi := String2Int64(ov); ovi == lv { // 只对本线程解锁 
            if d := redisConn.Del(lk); d.Err() == nil { 
                return true 
            } 
        } 
    } 
 
    return false 
}

上面三种方案还存在问题

上面的这类锁的最大缺点就是只作用在一个节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因放生了主从切换,那么就会出现锁丢失的情况:

1、在Redis的master节点上拿到了锁;

2、但是这个加锁的key还没有同步到slave节点;

3、master故障,发生了故障转移,slave节点升级为master节点;

4、导致锁丢失。

 

正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式Redlock。

 

Redlock实现

    在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

 

为了取到锁,客户端营该执行以下操作:

 

    1、获取当前Unix时间,以毫秒为单位。

    2、依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。

    3、客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。

    4、如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。

    5、如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

参考:https://mp.weixin.qq.com/s/9F6dor1p_j-nmNInBwJfCA

参考:https://redis.io/topics/distlock

  


本文参考链接:https://www.cnblogs.com/ricklz/p/10810061.html