Redis中的Pipeline以及watch事务Go

2023年 10月 9日 27.6k 0

pipeline

通常来说,单次对redis服务器的访问包含请求和返回数据的过程,而其中就会有响应时延

假设有多个访问,就会有多个响应时延,这样看起来很繁琐

a79b1b90eb06488e8d30cdef3a43504e.png

为了提高性能,我们可以使用pipeline对命令进行缓存

Redis管道(Pipeline)是一种批量执行Redis命令的机制。pipeline允许单次访问redis服务器执行多个命令,然后再返回多个命令的结果,从而大大提高了Redis的性能,减少了往返响应所需要的时间。

e9d066d7903f4efe8e4b5a3cdb0360e6.png

Redis管道特性适用于需要高性能、高并发、批量操作的场景,可以有效地提升Redis的操作效率和系统的整体性能。但管道操作是无序的,返回结果的顺序可能与命令的发送顺序不一致,因此在使用管道时需要根据实际情况进行结果的解析和处理。

有些情况例如,需要根据查询得到的值及时进行后续操作时,不适合使用pipeline,这样反而会拖慢程序的执行,降低性能

Redis管道的原理基于TCP连接的建立和消息传输机制。通过在客户端本地缓存多个命令请求,然后一次性发送到Redis服务器,减少网络往返的开销,提高性能。同时,通过TCP连接的保持,可以在不关闭连接的情况下继续使用管道进行后续的操作。

pipeline原理和使用场景参考

使用go-redis实现pipeline

示例原地址

以下示例使用go-redis库演示了使用pipeline一次执行Incr加一和Expire设置键过期时间两个命令

pipe := rdb.Pipeline()

incr := pipe.Incr(ctx, "pipeline_counter")
pipe.Expire(ctx, "pipeline_counter", time.Hour)

cmds, err := pipe.Exec(ctx)
if err != nil {
    panic(err)
}

// 在执行pipe.Exec之后才能获取到结果
fmt.Println(incr.Val())

Exec函数将pipeline中的命令一次性发送到redis服务器

或者,你也可以使用Pipelined 方法,它会在函数退出时调用 Exec。

var incr *redis.IntCmd

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
    incr = pipe.Incr(ctx, "pipelined_counter")
    pipe.Expire(ctx, "pipelined_counter", time.Hour)
    return nil
})
if err != nil {
    panic(err)
}

// 在pipeline执行后获取到结果
fmt.Println(incr.Val())

除此之外,事务操作经常会用到pipeline实现一次访问redis服务器执行多个命令

事务

TxPipeline 或 TxPipelined 方法将 pipeline 命令使用 MULTI 和EXEC包裹起来实现事务操作。

// TxPipeline demo
pipe := rdb.TxPipeline()
incr := pipe.Incr(ctx, "tx_pipeline_counter")
pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
_, err := pipe.Exec(ctx)
fmt.Println(incr.Val(), err)

// TxPipelined demo
var incr2 *redis.IntCmd
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
    incr2 = pipe.Incr(ctx, "tx_pipeline_counter")
    pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
    return nil
})
fmt.Println(incr2.Val(), err)

上面代码相当于在一个RTT下执行了下面的redis命令:

MULTI
INCR pipeline_counter
EXPIRE pipeline_counts 3600
EXEC

watch

我们通常搭配 WATCH命令来执行事务操作。从使用WATCH命令监视某个 key 开始,直到执行EXEC命令的这段时间里,如果有其他用户抢先对被监视的 key 进行了替换、更新、删除等操作,那么当用户尝试执行EXEC的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务或者放弃事务。

以下代码演示了如何使用go-redis库中的Watch函数来实现一个乐观锁。Watch函数会监听一个或多个键的变化,如果在执行事务期间,这些键被其他客户端修改了,那么事务就会失败。Watch函数接受一个上下文对象,一个回调函数和要监听的键作为参数。在回调函数中,我们可以使用TxPipelined函数来创建一个事务管道,并在其中执行我们想要的命令。

// 创建一个redis客户端对象
client := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "", // 如果没有密码,留空
    DB:       0,  // 使用默认数据库
})

// 创建一个上下文对象
ctx := context.Background()
// 定义一个要监听的键
key := "watch_key"

// 使用Watch函数监听键的变化,传入一个回调函数和要监听的键
err := client.Watch(ctx, func(tx *redis.Tx) error {
    // 在回调函数中,获取键的当前值
    val, err := tx.Get(ctx, key).Int()
    if err != nil && err != redis.Nil {
        return err // 如果出错,返回错误
    }
    // 在回调函数中,使用TxPipelined函数创建一个事务管道,传入一个回调函数
    _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
        // 在回调函数中,对键进行修改,例如加一
        pipe.Incr(ctx, key)
        // 如果有其他命令,也可以在这里添加
        return nil // 返回nil表示成功
    })
    return err // 返回错误或nil
}, key) // 传入要监听的键

// 检查Watch函数的返回值,如果不为nil,表示事务失败
if err != nil {
    fmt.Println("transaction failed:", err)
} else {
    fmt.Println("transaction success")
}
  • 乐观锁是一种并发控制的技术,它假设数据在一般情况下不会发生冲突,所以在访问数据时不会加锁,而是在更新数据时才检查是否有冲突。如果没有冲突,就可以提交更新,如果有冲突,就要根据情况进行处理,比如重试或者放弃。

  • 乐观锁的优点是可以提高并发性能,避免长时间的等待和死锁。乐观锁的缺点是需要额外的机制来检测和解决冲突,比如版本号或者时间戳。乐观锁适合用在读多写少的场景。

  • 悲观锁是基于一种悲观的态度类来防止一切数据冲突,它是以一种预防的姿态在修改数据之前把数据锁住,然后再对数据进行读写,在它释放锁之前任何人都不能对其数据进行操作,直到前面一个人把锁释放后下一个人数据加锁才可对数据进行加锁,然后才可以对数据进行操作,一般数据库本身锁的机制都是基于悲观锁的机制实现的;

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论