介绍
延迟队列是一种数据结构,用于处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有到达指定的时间点时才能从队列中取出并执行。
在实际应用中,延迟队列可以用于处理各种需要延迟处理的任务,例如发送邮件提醒、订单自动取消、对超时任务的处理等。由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。
Simple
在 Go 语言中,我们可以使用 time 包提供的计时器功能,通过使用 Go 中的 slice 存储延迟处理的任务,实现一个简单的延迟队列的功能。
示例代码:
type Task struct {
ExecuteTime time.Time
Job func()
}
首先,我们定义一个结构体 Task,它包含一个可以执行任务的函数 Job,和一个执行时间 ExecuteTime,这是期望执行该函数的时间。
示例代码:
type DelayQueue struct {
TaskQueue []Task
}
接下来,我们定义一个 DelayQueue 结构体,它拥有一个 TaskQueue,这是一个 Task 类型的切片,用于保存待执行任务的列表。
示例代码:
// 添加任务
func (d *DelayQueue) AddTask(t Task) {
d.TaskQueue = append(d.TaskQueue, t)
}
// 移除任务
func (d *DelayQueue) RemoveTask() {
d.TaskQueue = d.TaskQueue[1:]
}
// 执行任务
func (d *DelayQueue) ExecuteTasks() {
for len(d.TaskQueue) > 0 {
// 获取队列最顶部的任务
currentTask := d.TaskQueue[0]
// 如果执行时间还没到,等待
if time.Now().Before(currentTask.ExecuteTime) {
time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
}
// 执行任务
currentTask.Job()
// 移除已执行的任务
d.RemoveTask()
}
}
DelayQueue 包含三个方法:
- 第一个方法是 AddTask(t Task)。此方法将提供的任务 t 添加到 TaskQueue 的末尾。
- 第二个方法是 RemoveTask()。此方法从 TaskQueue 中移除第一个任务。
- 第三个方法是 ExecuteTasks()。此方法将执行 TaskQueue 中的所有任务。如果队列顶部任务的执行时间还未到,该方法将等待。一旦时间到了,它将会执行 Job 并从 TaskQueue 中移除该任务。
示例代码:
func main() {
fmt.Println("Start DelayQueue")
queue := DelayQueue{}
firstTask := Task{
ExecuteTime: time.Now().Add(4 * time.Second),
Job: func() {
fmt.Println("Executed task 1 after delay")
},
}
queue.AddTask(firstTask)
secondTask := Task{
ExecuteTime: time.Now().Add(10 * time.Second),
Job: func() {
fmt.Println("Executed task 2 after delay")
},
}
queue.AddTask(secondTask)
queue.ExecuteTasks()
fmt.Println("Done!")
}
输出结果:
Start DelayQueue
Executed task 1 after delay
Executed task 2 after delay
Done!
在示例代码中,我们创建了一个延时队列,将任务添加到队列中,并在指定的延时后执行它们。
通过使用这些结构体和方法,我们可以在 Go 中实现简单的延迟执行任务的功能。
但是,当 Go 程序重启时,存储在 slice 中的延迟处理的任务将全部丢失。
Complex
在 Go 程序中,如果想在重启后保留数据,我们可以将数据持久化到 Redis,可以使用 go-redis/redis 库[1]与 Redis 交互。而对于延迟队列的需求,则可以使用 Redis 的 ZSET(有序集合)特性来实现。
示例代码:
// 定义一个全局的redisdb变量
var redisdb *redis.Client
// 初始化连接
func initClient() (err error) {
redisdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
_, err = redisdb.Ping().Result()
if err != nil {
return err
}
return nil
}
全局变量 redisdb 是 redis.Client 类型的指针,用来保存到 Redis 客户端的引用。
initClient 函数初始化连接到 Redis 服务器,该服务器在本地主机的 6379 端口运行。它将一个新的 Redis 客户端分配给 redisdb 变量。如果连接成功,它就会 ping Redis 服务器以测试连接。
示例代码:
// 向队列中添加任务
func addTaskToQueue(task string, executeTime int64) {
err := redisdb.ZAdd("delay-queue", redis.Z{
Score: float64(executeTime),
Member: task,
}).Err()
if err != nil {
panic(err)
}
}
addTaskToQueue 函数将具有执行时间的任务添加到 Redis 等待排序的集合 "delay-queue"。执行时间是一个 UNIX 时间戳,作为排序集合中的项目的 score,允许 Redis 按照他们应该执行的时间来排序项目。
示例代码:
// 从队列中获取并处理任务
func getAndExecuteTasks() {
for {
// 使用 ZRANGEBYSCORE 命令获取分数(时间戳)