问题内容
我是一名 python 开发人员,但应该使用 go 制作数据流管道。
与 python 或 java 相比,我找不到那么多使用 go 的 apache beam 示例。
我有以下代码,其中具有用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但卡在过滤部分。
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.registerfunction(incrementage)
}
type user struct {
name string
age int
}
func printrow(ctx context.context, list user) {
fmt.println(list)
}
func incrementage(list user) user {
list.age++
return list
}
func main() {
flag.parse()
beam.init()
ctx := context.background()
p := beam.newpipeline()
s := p.root()
var userlist = []user{
{"bob", 40},
{"adam", 50},
{"john", 35},
{"ben", 8},
}
initial := beam.createlist(s, userlist)
pc := beam.pardo(s, incrementage, initial)
pc1 := beam.pardo(s, func(row user, emit func(user)) {
emit(row)
}, pc)
beam.pardo0(s, printrow, pc1)
if err := beamx.run(ctx, p); err != nil {
log.exitf(ctx, "failed to execute job: %v", err)
}
}
登录后复制
我尝试创建一个如下所示的函数,但这返回一个布尔值而不是用户对象。我知道我错过了一些简单但无法弄清楚的事情。
func filterage(list user) user {
return list.age > 40
}
登录后复制
在 python 中,我可以编写如下所示的函数。
beam.Filter(lambda line: line["Age"] >= 40))
登录后复制
正确答案
您需要在函数中添加一个发射器来发射用户:
func filterAge(list user, emit func(user)) {
if list.Age > 40 {
emit(list)
}
}
登录后复制
正如您当前代码中所写, 返回 list.age > 40
list.age > 40
首先评估为 true(布尔值),并且返回该布尔值。
以上就是Go 中的 Apache Beam ParDo 过滤器的详细内容,更多请关注每日运维网(www.mryunwei.com)其它相关文章!