如何限制生产者和消费者读取消息?

2024年 2月 11日 55.8k 0

如何限制生产者和消费者读取消息?

php小编子墨在软件开发过程中,消息队列是一种常见的通信机制,用于实现生产者和消费者之间的异步通信。然而,有时候我们希望控制生产者和消费者对消息的读取,以便更好地管理系统资源和处理高峰时段的请求。本文将介绍一些限制生产者和消费者读取消息的方法,帮助开发者优化系统性能和提高应用的稳定性。

问题内容

我想用 go 获得应用程序生产者-消费者(通过信号关闭)。

生产者不断在队列中生成消息,限制为 10 条。
一些消费者阅读并处理该频道。
如果队列中的消息数为0,生产者再次生成10条消息。
当收到停止信号时,生产者停止生成新消息,消费者处理通道中的所有内容。

我找到了一段代码,但无法理解它是否正常工作,因为发现了奇怪的东西:

  • 为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。 (在屏幕截图中,发送了 15 条消息,但处理了 5 条消息)
  • 如何正确地将队列限制为10条消息,即必须写入10条消息,等待队列计数器变为0时处理,然后再写入10条?
  • 是否可以在停止信号后通知生产者,以便他不再向通道生成新消息? (在屏幕截图中,生产者成功写入队列 - 12,13,14,15)
  • 结果:

    代码示例:

    package main

    import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
    )

    func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i 登录后复制

    解决方法

    为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。

    这是因为当 ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 in 通道。

    下面的演示解决了这个问题并简化了源代码。

    注释:

  • producectx 完成后停止。并且它关闭了 in 通道。

  • 字段 jobs 已从 consumer 中删除,工作人员直接从 in 通道读取。

  • 以下要求被忽略,因为它很奇怪。常见的行为是,当作业产生时,如果 in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in 通道读取作业为止。

    如果队列中的消息数为0,生产者再次生成10条消息

  • package main

    import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
    )

    func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i 登录后复制

    以上就是如何限制生产者和消费者读取消息?的详细内容,更多请关注每日运维网(www.mryunwei.com)其它相关文章!

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论