用Go语言&&Redis实现分布式锁,我还是第一次

2024年 4月 17日 76.5k 0

一 为什么需要分布式锁

共享资源访问控制: 当多个节点需要同时访问共享资源时,为了避免并发写入导致数据不一致,需要使用分布式锁确保同时只有一个节点可以写入或修改共享资源。

避免重复执行: 在分布式系统中,某些操作可能需要在整个系统中只执行一次,比如定时任务、数据初始化等。为了避免多个节点同时执行这些操作,需要使用分布式锁来确保只有一个节点可以执行。

任务协调: 在分布式任务队列中,多个节点竞争执行任务时,可能需要对任务进行加锁,以确保每个任务只被一个节点执行,避免重复执行或者操作冲突。

防止死锁: 在分布式系统中,由于网络延迟、节点故障等原因,可能会导致死锁情况的发生。分布式锁可以用来避免死锁的发生,通过设置合理的超时时间和重试机制,确保锁在一定时间内被释放。

分布式系统中共享同一个资源时,就需要分布式锁来确保变更资源的一致性。这就是为什么要用到分布式锁的原因咯。

二、分布式锁需要具备特性

1 互斥性(Mutual Exclusion): 在任何时刻,只能有一个客户端持有锁,其他客户端不能同时持有该锁。这是最基本的锁特性,确保在同一时间只有一个客户端能够访问共享资源。

2 安全性(Safety): 在锁被释放之前,任何其他客户端都不能获得该锁。即使是在网络分区、节点故障等异常情况下,也要确保锁的安全性,避免数据不一致或者操作冲突。

3 活性(Liveness): 锁应该能够在合理的时间内被获取,避免长时间的等待导致死锁或者无法响应其他客户端请求。活性也包括在锁被释放后,其他客户端能够尽快地获取到该锁。

4 容错性(Fault Tolerance): 分布式系统中可能会发生网络分区、节点故障等异常情况,分布式锁需要具备容错性,能够在这些异常情况下正确地工作。比如,锁的实现应该能够处理网络分区导致的消息丢失或者超时等情况。

5 性能(Performance): 锁的实现应该尽可能地减少锁竞争和通信开销,提高系统的性能。例如,可以使用高效的算法和数据结构来减少锁的持有时间和等待时间,或者采用缓存和批处理等技术来减少通信开销。

6 可扩展性(Scalability): 锁的实现应该能够随着系统规模的增长而扩展,确保在高并发和大规模的分布式环境下仍然能够保持良好的性能和可用性。

三、实现 Redis 锁应先掌握的知识点

set 命令

SET key value [EX seconds] [PX milliseconds] [NX|XX]
  • EX second :设置键的过期时间为 second 秒。SET key value EX second 效果等同于 SETEX key second value。
  • PX millisecond :设置键的过期时间为 millisecond 毫秒。SET key value PX millisecond ,效果等同于 PSETEX key millisecond value 。
  • NX :键不存在时,才对键进行设置操作。SET key value NX 等同于 SETNX key value 。
  • XX :键已经存在时,才对键进行设置操作。

Redis.lua 脚本

我们可以使用 redis lua 脚本,将一系列命令操作封装成 pipline,实现整体操作的原子性。

加锁的整个流程,详细原理说明看注释

-- Lua 脚本实现 Redis 分布式锁

-- 生成唯一标识
local requestId = ARGV[1]

-- 尝试获取锁
local lockKey = KEYS[1]
local lockValue = requestId
local lockExpireTime = tonumber(ARGV[2])

local result = redis.call('SET', lockKey, lockValue, 'NX', 'PX', lockExpireTime)

-- 判断获取锁的结果
if result == 'OK' then
    -- 获取锁成功,设置锁的过期时间
    return 'OK'
else
    -- 获取锁失败
    return 'FAIL'
end

1 生成唯一标识: 首先,在客户端生成一个唯一的标识,可以是 UUID、Snowflake 算法生成的分布式 ID 等。

2 尝试获取锁: 客户端将生成的唯一标识作为参数,调用 Redis 的 SET 命令尝试获取锁。可以使用 NX(如果键不存在则设置)和 PX(设置键的过期时间)选项,确保只有一个客户端能够成功获取到锁。

3 判断获取锁的结果: 如果获取锁成功,SET 命令会返回 OK,表示当前客户端成功获取了锁。如果获取锁失败,说明已经有其他客户端持有了锁,此时客户端需要进行等待或者返回失败。

4 设置锁的过期时间: 在成功获取锁之后,客户端需要设置锁的过期时间,以防止因为客户端崩溃或者其他原因导致锁一直占用,造成死锁。

5 返回获取锁的结果: 根据 SET 命令的返回值,客户端判断是否成功获取到了锁,并将结果返回给调用方。

加锁流程图

图片图片

解锁流程

if redis.call("get", KEYS[1]) == ARGV[1] then

    return redis.call("del", KEYS[1])

else

    return 0

end

1 使用 KEYS[1] 获取传入的锁键名。

2 使用 ARGV[1] 获取传入的锁值(即加锁时设置的唯一标识)。

3 判断当前锁是否存在且锁值与传入的锁值相同,若是,则调用 DEL 命令删除该锁,并返回 1 表示解锁成功。

4 若锁不存在或锁值不匹配,则返回 0 表示解锁失败。

解锁的流程图

图片图片

源码解析

package redis

import (
    "math/rand"
    "strconv"
    "sync/atomic"
    "time"

    red "github.com/go-redis/redis"
    "github.com/tal-tech/go-zero/core/logx"
)

const (
    letters  = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    randomLen = 16

    // 默认超时时间,用来防止死锁
    tolerance       = 300 // milliseconds
    millisPerSecond = 800

    lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
    redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
    return "OK"
else
    return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`

    delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end`

)

type redisLock struct {
    // redis客户端
    store *Redis
    // 超时时间
    seconds uint32
    // 锁key
    keys string
    // 锁value,防止锁被别人获取到
    value string
}

func init() {
    rand.Seed(time.Now().UnixNano())
}

// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, keys string) *RedisLock {
    return &RedisLock{
        store: store,
        keys:   keys,
        // 获取锁时,锁的值通过随机字符串生成
        // 实际上go-zero提供更加高效的随机字符串生成方式
        // 见core/stringx/random.go:Randn
        value:    randomStr(randomLen),
    }
}

// Acquire acquires the lock.
// 加锁
func (rl *RedisLock) Acquire() (bool, error) {
    // 获取过期时间
    seconds := atomic.LoadUint32(&rl.seconds)
    // 默认锁过期时间为500ms,防止死锁
    resp, err := rl.store.Eval(lockCommand, []string{rl.keys}, []string{
        rl.value, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
    })
    if err == red.Nil {
        return false, nil
    } else if err != nil {
        logx.Errorf("Error on lock for %s, %s", rl.key, err.Error())
        return false, err
    } else if resp == nil {
        return false, nil
    }

    reply, ok := resp.(string)
    if ok && reply == "OK" {
        return true, nil
    }

    logx.Errorf("Unknown reply lock for %s: %v", rl.keys, resp)
    return false, nil
}

// Release releases the lock.
// 释放锁
func (rl *RedisLock) Release() (bool, error) {
    resp, err := rl.store.Eval(delCommand, []string{rl.keys}, []string{rl.value})
    if err != nil {
        return false, err
    }

    reply, ok := resp.(int64)
    if !ok {
        return false, nil
    }

    return reply == 1, nil
}

func randomStr(n int) string {
    b := make([]byte, n)
    for i := range b {
        b[i] = letters[rand.Intn(len(letters))]
    }
    return string(b)
}

// SetExpire sets the expire.
// 需要注意的是需要在Acquire()之前调用
// 不然默认为300ms自动释放
func (rl *RedisLock) SetExpire(seconds int) {
    atomic.StoreUint32(&rl.seconds, uint32(seconds))
}

这个详细源码根据自己的业务需要,可以利用。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论