RabbitMQ官方文档Work QueuesGo

2023年 7月 14日 15.5k 0

工作队列(Work Queues)

image.png

在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个worker时,任务将在它们之间共享。

这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中不可能处理复杂的任务。

准备

在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整图片大小或渲染pdf文件,所以让我们假装我们很忙——利用时间。睡眠功能。我们将把字符串中点的数量作为它的复杂度;每个点将占“工作”的一秒钟。例如,Hello…只需要三秒钟。

我们会稍微修改一下发送。使用前面示例中的代码,允许从命令行发送任意消息。这个程序将把任务调度到我们的工作队列中,所以我们将它命名为new_task.go:

package main

import (
	"context"
	"log"
	"os"
	"strings"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func bodyFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	q, err := ch.QueueDeclare("hello", true, false, false, false, nil)
	failOnError(err, "Failed to declare a queue")

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	body := bodyFrom(os.Args)
	err = ch.PublishWithContext(ctx,
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "text/plain",
			Body:         []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)
}

消费者Go脚本还需要一些更改:它需要为消息体中的每个点进行一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其命名为worker.go:

package main

import (
	"bytes"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello_world", // name
		true,          // durable
		false,         // delete when unused
		false,         // exclusive
		false,         // no-wait
		nil,           // arguments
	)
	failOnError(err, "Failed to declare a queue")
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			dotCount := bytes.Count(d.Body, []byte("."))
			t := time.Duration(dotCount)
			time.Sleep(t * time.Second)
			log.Printf("Done")
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	 [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三个终端中,我们将发布新的任务。一旦你启动了消费者,你就可以发布一些消息:

# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....

让我们看看我们的woker得到了什么:

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的信息。这种分发消息的方式称为轮询。可以在有三个或更多的worker的情况下尝试一下。

消息确认(Message acknowledgment)

执行一个任务可能需要几秒钟的时间,您可能想知道,如果消费者启动一个长任务,并在完成之前终止,会发生什么。在我们当前的代码中,一旦RabbitMQ将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果终止一个worker,它正在处理的消息将丢失。发送给此特定worker但尚未处理的消息也会丢失。

但我们不想丢掉任何任务。如果一个工人死了,我们希望把任务交给另一个工人。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。一个ack(知识)由消费者返回,告诉RabbitMQ一个特定的消息已经被接收和处理,并且RabbitMQ可以自由地删除它。

如果一个消费者死亡(它的通道被关闭,连接被关闭,或者TCP连接丢失)而没有发送ack, RabbitMQ将理解消息没有被完全处理,并将重新将其排队。如果同时有其他消费者在线,它将迅速将其重新交付给另一个消费者。这样就可以确保没有信息丢失,即使工作人员偶尔会死亡。

在消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测从不确认交付的有bug的(卡住的)消费者。您可以按照交付确认超时中所述增加此超时。

在本教程中,我们将通过为“auto-ack”参数传递false来使用手动消息确认,然后在完成任务后,使用d.Ack(false)从worker发送适当的确认(此确认单个交付)。

我们已经学会了如何确保即使消费者死了,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要将其声明为持久的:

q, err := ch.QueueDeclare(
  "hello",      // name
  true,         // durable
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你用不同的参数重新定义一个已经存在的队列,任何试图这样做的程序都会返回一个错误。但是有一个快速的解决方案——让我们用不同的名字声明一个队列,例如task_queue:

q, err := ch.QueueDeclare(
  "task_queue", // name
  true,         // durable
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

durable选项更改需要同时应用于生产者和消费者代码。
此时我们可以确定,即使RabbitMQ重启,task_queue队列也不会丢失。现在我们需要使用amqp将消息标记为持久消息-通过在amqp.Publishing中使用amqp.Persistent选项

err = ch.PublishWithContext(ctx,
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
})

将消息标记为持久消息并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存它时,仍然有很短的时间窗口。而且,RabbitMQ并没有对每条消息进行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但对于我们的简单任务队列来说已经足够了。如果你需要更有力的保证,那么你可以使用发行商确认。

公平调度

您可能已经注意到调度仍然没有完全按照我们希望的那样工作。例如,在两个worker的情况下,当所有奇数消息都很重,偶数消息很轻时,一个worker将一直很忙,而另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地分发消息。

这是因为RabbitMQ只在消息进入队列时调度消息。它不关注消费者未确认消息的数量。它只是盲目地将第n条消息分派给第n个消费者。

image.png

为了解决这个问题,我们可以将预取计数设置为1。这告诉RabbitMQ一次不要给一个worker发送多个消息。或者,换句话说,在工作线程处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的工人。

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
failOnError(err, "Failed to set QoS")

Note about queue size

如果所有的工人都很忙,你的队列就会被填满。你需要密切关注这一点,可能会增加更多的员工,或者有其他的策略。

全部代码

new_task.go

package main

import (
        "context"
        "log"
        "os"
        "strings"
        "time"

        amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "task_queue", // name
                true,         // durable
                false,        // delete when unused
                false,        // exclusive
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        body := bodyFrom(os.Args)
        err = ch.PublishWithContext(ctx,
                "",           // exchange
                q.Name,       // routing key
                false,        // mandatory
                false,
                amqp.Publishing{
                        DeliveryMode: amqp.Persistent,
                        ContentType:  "text/plain",
                        Body:         []byte(body),
                })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

worker.go::

package main

import (
"bytes"
"log"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

var forever chan struct{}

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论