Go 中的 Apache Beam ParDo 过滤器

2024年 2月 5日 61.5k 0

go 中的 apache beam pardo 过滤器

问题内容

我是一名 python 开发人员,但应该使用 go 制作数据流管道。
与 python 或 java 相比,我找不到那么多使用 go 的 apache beam 示例。

我有以下代码,其中具有用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但卡在过滤部分。

package main

import (
"context"
"flag"
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
beam.registerfunction(incrementage)
}

type user struct {
name string
age int
}

func printrow(ctx context.context, list user) {
fmt.println(list)
}

func incrementage(list user) user {
list.age++
return list
}

func main() {

flag.parse()
beam.init()

ctx := context.background()

p := beam.newpipeline()
s := p.root()

var userlist = []user{
{"bob", 40},
{"adam", 50},
{"john", 35},
{"ben", 8},
}
initial := beam.createlist(s, userlist)

pc := beam.pardo(s, incrementage, initial)

pc1 := beam.pardo(s, func(row user, emit func(user)) {
emit(row)
}, pc)

beam.pardo0(s, printrow, pc1)

if err := beamx.run(ctx, p); err != nil {
log.exitf(ctx, "failed to execute job: %v", err)
}

}

登录后复制

我尝试创建一个如下所示的函数,但这返回一个布尔值而不是用户对象。我知道我错过了一些简单但无法弄清楚的事情。

func filterage(list user) user {
return list.age > 40
}

登录后复制

在 python 中,我可以编写如下所示的函数。

beam.Filter(lambda line: line["Age"] >= 40))

登录后复制

正确答案

您需要在函数中添加一个发射器来发射用户:

func filterAge(list user, emit func(user)) {
if list.Age > 40 {
emit(list)
}
}

登录后复制

正如您当前代码中所写, 返回 list.age > 40
list.age > 40 首先评估为 true(布尔值),并且返回该布尔值。

以上就是Go 中的 Apache Beam ParDo 过滤器的详细内容,更多请关注每日运维网(www.mryunwei.com)其它相关文章!

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论