第4章 Flink 基础API 三:转换算子(Transformation)

2023年 8月 18日 60.1k 0

4.3、转换算子(Transformation)

4.3.1、转换算子概述

用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。这部分内容将描述 Flink DataStream API 中基本的数据转换 API,数据转换后各种数据分区方式,以及算子的链接策略。下图是转换算子的逻辑关系图:

转换算子逻辑关系.png

流式数据上的操作根据对记录数量的操作可以分为以下几类:

  • 单条记录操作

第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)

  • 多条记录操作

第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理

  • 合并操作

第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。

  • 拆分操作

最后, DataStream 还支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

操作概览

操作概览.png

根据功能可以分为一下几类,本节叙述也是根据这几类展开:

  • 基本转换算子

基本转换算子有 映射(map)、过滤(filter)、扁平映射(flatmap)等。

  • 聚合转换算子

聚合转换算子有 按键分区(keyBy)、简单聚合(sum、min、max、minBy、maxBy)、归约聚合(reduce)等。

  • 用户自定义函数

用户自定义函数(UDF)主要有函数类(Function Classes)和富函数类(Rich Function Classes)等

  • 物理区分算子

物理分区算子(Physical Partitioning)主要有随机分区(shuffle)、 轮询分区(Round-Robin)、重缩放分区(rescale)、广播(broadcast)、全局分区(global)、自定义分区(Custom)等

  • 分流

侧输出流(outputTag)

  • 基本合流操作

基本合流操作有联合(Union)、连接(Connect)

4.3.2、基本转换算子

准备数据

准备一个数据模型,方面后面使用,这个数据模型有下面几个要求:

  • 类是公有(public)的;
  • 有一个无参的构造方法;
  • 所有属性都是公有(public)的;
  • 所有属性的类型都是可以序列化的

数据模型具体如下

package org.mochi.bean;
​
import java.util.Objects;
​
public class Student {
    /**
     * 姓名
     */
    public String name;
​
    /**
     * 年龄
     */
    public int age;
​
    /**
     * 学号
     */
    public long number;
​
    public Student() {
    }
​
    public Student(String name, int age, long number) {
        this.name = name;
        this.age = age;
        this.number = number;
    }
​
    public String getName() {
        return name;
    }
​
    public void setName(String name) {
        this.name = name;
    }
​
    public int getAge() {
        return age;
    }
​
    public void setAge(int age) {
        this.age = age;
    }
​
    public long getNumber() {
        return number;
    }
​
    public void setNumber(long number) {
        this.number = number;
    }
​
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Student student = (Student) o;
        return age == student.age && number == student.number && Objects.equals(name, student.name);
    }
​
    @Override
    public int hashCode() {
        return Objects.hash(name, age, number);
    }
​
    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + ''' +
                ", age=" + age +
                ", number=" + number +
                '}';
    }
}
​

4.3.2.1、映射算子(map)

主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。 我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是 接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改 变。

DataStream → DataStream
map(new MapFunction ) 
MapFunction: (x)-> y [1 条变 1 条]

转换算子_map.png

package org.mochi.transform;
​
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.mochi.bean.Student;
​
public class TransformMap {
    public static void main(String[] args) throws Exception {
​
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        DataStreamSource stream = env.fromElements(
                new Student("tom", 18, 1L),
                new Student("jerry", 20, 2L)
        );
​
        // 传入匿名内部类,实现 MapFunction
        stream.map(new MapFunction() {
            @Override
            public String map(Student student){
​
                // 获取名称
                return student.name;
            }
        }).print();
​
        env.execute();
    }
}

输出结果

4> tom
5> jerry

上面代码中,MapFunction 实现类的泛型类型,与输入数据类型和输出数据的类型有关。 在实现 MapFunction 接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型, 还需要重写一个 map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

4.3.2.2、过滤算子(filter)

filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤 条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。 进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参 数需要实现 FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。

DataStream → DataStream
filter(new FilterFunction)
FilterFunction : x -> true/false

转换算子_filter.png

package org.mochi.transform;
​
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.mochi.bean.Student;
​
public class TransformFilter {
    public static void main(String[] args) throws Exception {
​
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        DataStreamSource stream = env.fromElements(
                new Student("Tom", 18, 1L),
                new Student("Jerry", 20, 1L),
                new Student("Spike", 19, 1L)
        );
​
        stream.filter(new FilterFunction() {
            @Override
            public boolean filter(Student student) {
​
                // 获取年龄大于 18 的 学生
                return student.age > 18;
            }
​
         }).print();
​
        env.execute();
    }
​
}

输出结果

8> Student{name='Spike', age=19, number=1}
7> Student{name='Jerry', age=20, number=1}

4.3.2.3、扁平映射(flatMap)

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个 一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化” (flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分, 再对拆分后的元素做转换处理。 同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

DataStream → DataStream
flatMap( new FlatMapFcuntion)
FlatMapFunction: x-> x1, x2,x3,x4 [1 条变多条,并展平]

转换算子_flatmap.png

package org.mochi.transform;
​
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.mochi.bean.Student;
​
import java.util.Objects;
​
public class TransformFlatmap {
    public static void main(String[] args) throws Exception {
​
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        DataStreamSource stream = env.fromElements(
                new Student("Tom", 18, 1L),
                new Student("Tom", 19, 1L),
                new Student("Jerry", 20, 1L),
                new Student("Jerry", 20, 1L),
                new Student("Spike", 19, 1L)
        );
​
        stream.flatMap(new FlatMapFunction() {
​
            @Override
            public void flatMap(Student student, Collector collector) throws Exception {
​
                if (Objects.equals("Tom", student.name)){
                    collector.collect(String.valueOf(student));
                }
            }
        }).print();
​
        env.execute();
    }
}

4.3.3、聚合算子(Aggregation)

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进 行汇总合并——这就是所谓的“聚合”(Aggregation),类似于 MapReduce 中的 reduce 操作。

4.3.3.1、按键分区(keyBy)

对于 Flink 而言 DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中要做聚合需要先进行分区,这个操作就是通过 keyBy 来完成的。 keyBy 是聚合前必须要用到的一个算子,keyBy 通过指定键(key)可以将一条流从逻辑 上划分成不同的分区(partitions)。这里所说的分区其实就是并行处理的子任务。 基于不同的 key流中的数据将被分配到不同的分区中去,这样一来,所有具有相同的 key 的数据都将被发往同一个分区。
DataStream → KeyedStream

transform_keyby.png

在内部是通过计算 key 的哈希值(hash code)对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话必须要重写 hashCode() 方法。 keyBy() 方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来 指定 key,比如对于 Tuple 数据类型可以指定字段的位置或者多个位置的组合,对于 POJO 类型可以指定字段的名称(String),另外还可以传入Lambda表达式或者实现一个键选择 器(KeySelector)用于说明从数据中提取 key 的逻辑。

需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为 KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照 key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。 KeyedStream 也继承自 DataStream,所以基于它的操作也都归属于 DataStream API。但它只是一个流的分区操作,并不是 一个转换算子。KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce)。

package org.mochi.transform;
​
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.mochi.bean.Student;
​
import java.util.Objects;
​
public class TransformKeyBy {
​
    public static void main(String[] args) throws Exception {
​
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
​
        DataStreamSource stream = env.fromElements(
                new Student("Tom", 18, 1L),
                new Student("Tom", 19, 1L),
                new Student("Jerry", 20, 1L),
                new Student("Spike", 21, 1L)
        );
​
        stream.keyBy(new KeySelector() {
​
            @Override
            public String getKey(Student student) throws Exception{
                return student.getName();
            }
        }).print();
​
        // 使用lambda 表达式
        //stream.keyBy(e -> e.getName()).print();
​
        env.execute();
​
    }
}

输出结果

2> Student{name='Tom', age=18, number=1}
1> Student{name='Spike', age=21, number=1}
2> Student{name='Tom', age=19, number=1}
2> Student{name='Jerry', age=20, number=1}

输出结果的的 “2>" 和 "1>" 是线程号,就是我们设置的env.setParallelism(2),最大不超过电脑的 CPU 核数。

keyBy 分组与分区的关系:

keyBy 是对数据分组,保证相同 key 的数据在一个分组

分区:一个子任务就可以理解为一个分区,一个分区(子任务)可以存在多个分组(key)

4.3.3.2、简单聚合(sum/min/max/minBy/maxBy)

有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们 内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

  • sum()

在输入流上,对指定的字段做叠加求和的操作。

  • min()

在输入流上,对指定的字段求最小值。

  • max()

在输入流上,对指定的字段求最大值。

minBy()

与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回 包含字段最小值的整条数据。

  • maxBy()

与 max()类似,在输入流上针对指定字段求最大值。两者区别与 min()/minBy()完全一致。

简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数; 但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。 对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。 如果数据流的类型是 POJO 类,那么就只能通过字段名称来指定,不能通过位置来指定 了。
package org.mochi.transform;
​
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.mochi.bean.Student;
​
public class transformAggregation {
    public static void main(String[] args) throws Exception {
​
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        DataStreamSource stream = env.fromElements(
                new Student("Tom", 19, 1L),
                new Student("Tom", 20, 2L),
                new Student("Jerry", 20, 3L),
                new Student("Spike", 21,4L)
        );
​
        stream.keyBy(e -> e.getName()).max("age").print();
//        stream.keyBy(e -> e.getName()).maxBy("age").print();
​
​
        env.execute();
    }
}

输出结果

8> Student{name='Tom', age=19, number=1}
7> Student{name='Jerry', age=20, number=3}
4> Student{name='Spike', age=21, number=4}
8> Student{name='Tom', age=20, number=1}

如果使用 maxBy()函数,输出结果如下

8> Student{name='Tom', age=19, number=1}
4> Student{name='Spike', age=21, number=4}
7> Student{name='Jerry', age=20, number=3}
8> Student{name='Tom', age=20, number=2}

可以看出,使用max()时,number 字段输出保持不变,而使用maxBy()时,number 跟随当前数据变化。

简单聚合算子返回的是从KeyedStream又 转换成了常规的 DataStream。所以可以这样理解:keyBy 和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。而且经过简单聚合之后的数据流,元素的数据类型保 持不变。

一个聚合算子,会为每一个 key 保存一个聚合的值,在 Flink 中我们把它叫作“状态” (state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更 新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们 使用聚合算子,应该只用在含有有限个 key 的数据流上。

4.3.2.3、 归约聚合(reduce)

reduce 可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的 值,再做一个聚合计算。

转换算子_reduce.png

reduce操作也会将 KeyedStream转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。 调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:

 public interface ReduceFunction extends Function, Serializable {
​
    T reduce(T value1, T value2) throws Exception;
​
 } 

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处 理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结 果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步 做归约。 我们可以单独定义一个函数类实现 ReduceFunction 接口,也可以直接传入一个匿名类。 当然,同样也可以通过传入 Lambda 表达式实现类似的功能。

package org.mochi.transform;
​
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.mochi.bean.Student;
​
public class transformReduce {
​
    public static void main(String[] args) throws Exception {
​
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        DataStreamSource stream = env.fromElements(
                new Student("Tom", 18, 1L),
                new Student("Tom", 19, 1L),
                new Student("Jerry", 20, 1L),
                new Student("Spike", 21, 1L)
        );
​
        stream.keyBy(Student::getName).reduce(new ReduceFunction() {
            @Override
            public Student reduce(Student s1, Student s2) throws Exception {
                System.out.println("s1" + s1);
                System.out.println("s2" + s2);
                return new Student(s1.name, s1.age, s1.number + s2.number);
            }
        }).print();
​
        env.execute();
    }
}

输出结果

4> Student{name='Spike', age=21, number=1}
8> Student{name='Tom', age=18, number=1}
7> Student{name='Jerry', age=20, number=1}
s1Student{name='Tom', age=18, number=1}
s2Student{name='Tom', age=19, number=1}
8> Student{name='Tom', age=18, number=2}

reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将 reduce 算子作用在一个有限 key 的流上。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论