工作队列(Work Queues)
在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个worker时,任务将在它们之间共享。
这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中不可能处理复杂的任务。
准备
在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整图片大小或渲染pdf文件,所以让我们假装我们很忙——利用时间。睡眠功能。我们将把字符串中点的数量作为它的复杂度;每个点将占“工作”的一秒钟。例如,Hello…只需要三秒钟。
我们会稍微修改一下发送。使用前面示例中的代码,允许从命令行发送任意消息。这个程序将把任务调度到我们的工作队列中,所以我们将它命名为new_task.go
:
package main
import (
"context"
"log"
"os"
"strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare("hello", true, false, false, false, nil)
failOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
消费者Go脚本还需要一些更改:它需要为消息体中的每个点进行一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其命名为worker.go
:
package main
import (
"bytes"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello_world", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
[*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
在第三个终端中,我们将发布新的任务。一旦你启动了消费者,你就可以发布一些消息:
# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....
让我们看看我们的woker得到了什么:
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的信息。这种分发消息的方式称为轮询。可以在有三个或更多的worker的情况下尝试一下。
消息确认(Message acknowledgment)
执行一个任务可能需要几秒钟的时间,您可能想知道,如果消费者启动一个长任务,并在完成之前终止,会发生什么。在我们当前的代码中,一旦RabbitMQ将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果终止一个worker,它正在处理的消息将丢失。发送给此特定worker但尚未处理的消息也会丢失。
但我们不想丢掉任何任务。如果一个工人死了,我们希望把任务交给另一个工人。
为了确保消息永远不会丢失,RabbitMQ支持消息确认。一个ack(知识)由消费者返回,告诉RabbitMQ一个特定的消息已经被接收和处理,并且RabbitMQ可以自由地删除它。
如果一个消费者死亡(它的通道被关闭,连接被关闭,或者TCP连接丢失)而没有发送ack, RabbitMQ将理解消息没有被完全处理,并将重新将其排队。如果同时有其他消费者在线,它将迅速将其重新交付给另一个消费者。这样就可以确保没有信息丢失,即使工作人员偶尔会死亡。
在消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测从不确认交付的有bug的(卡住的)消费者。您可以按照交付确认超时中所述增加此超时。
在本教程中,我们将通过为“auto-ack”参数传递false
来使用手动消息确认,然后在完成任务后,使用d.Ack(false)
从worker发送适当的确认(此确认单个交付)。
我们已经学会了如何确保即使消费者死了,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要将其声明为持久的:
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你用不同的参数重新定义一个已经存在的队列,任何试图这样做的程序都会返回一个错误。但是有一个快速的解决方案——让我们用不同的名字声明一个队列,例如task_queue:
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
此durable
选项更改需要同时应用于生产者和消费者代码。
此时我们可以确定,即使RabbitMQ重启,task_queue
队列也不会丢失。现在我们需要使用amqp将消息标记为持久消息-通过在amqp.Publishing
中使用amqp.Persistent
选项
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
将消息标记为持久消息并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存它时,仍然有很短的时间窗口。而且,RabbitMQ并没有对每条消息进行
fsync(2)
——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但对于我们的简单任务队列来说已经足够了。如果你需要更有力的保证,那么你可以使用发行商确认。
公平调度
您可能已经注意到调度仍然没有完全按照我们希望的那样工作。例如,在两个worker的情况下,当所有奇数消息都很重,偶数消息很轻时,一个worker将一直很忙,而另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地分发消息。
这是因为RabbitMQ只在消息进入队列时调度消息。它不关注消费者未确认消息的数量。它只是盲目地将第n条消息分派给第n个消费者。
为了解决这个问题,我们可以将预取计数设置为1
。这告诉RabbitMQ一次不要给一个worker发送多个消息。或者,换句话说,在工作线程处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的工人。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
Note about queue size
如果所有的工人都很忙,你的队列就会被填满。你需要密切关注这一点,可能会增加更多的员工,或者有其他的策略。
全部代码
new_task.go
package main
import (
"context"
"log"
"os"
"strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
worker.go:
:
package main
import (
"bytes"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")