3.1、Flink的运行架构
3.1.1、Flink 基本组件栈
一个计算框架只有上层有了具体的应用,并能很好的发挥计算框架本身的优势,那么这个计算框架才能吸引更多的资源,才会更快的进步。Flink 有着自己的完整的 Stack,Flink 每一层所包含的组件都提供了特定的抽象,用来服务于上层组件,分层的组件栈如下图所示:
- API & Libraries 层
Flink 持了 Java、Scala 和 Python 的 API。DataStream、DataSet、Table、SQL API,Table 是一种接口化的 SQL 支持,也就是 API 支持(DSL),而不是文本化的 SQL 解析和执行。作为分布式数据处理框架,Flink 同时提供了支撑流计算和批计算的接口,两者都提供给用户丰富的数据处理高级 API,例如 Map、FlatMap 操作等,也提供比较低级的 Process Function API,用户可以直接操作状态和时间等底层数据。在 Flink1.15 版本中批计算接口已经标记为 Legacy(已过时),后续版本建议使用 Flink 流计算接口。
扩展库
Flink 在流计算和批计算的接口的此基础上抽象出不同的应用类型的组件库,如基于流处理的 CEP (复杂事件处理库),SQL & TABLE 库和基于批处理的 FlinkML(机器学习库),Gelly(图处理库)等。
模式检测是事件流处理中的一个非常常见的用例。Flink 的 CEP 库提供了 API,使用户能够以例如正则表达式或状态机的方式指定事件模式。CEP 库与 Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的应用包括网络入侵检测,业务流程监控和欺诈检测。
Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。因此,它能够受益于其可扩展且健壮的操作符。Gelly 提供了 内置算法 ,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一个简化自定义图算法实现的 Graph API 。
- Core(Runtime 核心层)
Runtime 核心层提供了支持 Flink 计算的全部核心实现,为上层 API 层提供基础服务,该层主要负责对上层不同接口提供基础服务,也是 Flink 分布式计算框架的核心实现层,支持分布式 Stream 作业的执行、JobGraph 到 ExecutionGraph 的映射转换、任务调度等。将DataSteam 和 DataSet 转成统一的可执行的 Task Operator,达到在流式引擎下同时处理批量计算和流式计算的目的。
- Deploy(物理部署层)
Flink 支持本地运行、能在独立集群或者在被 YARN 管理的集群上运行, 也能部署在云上,该层主要涉及 Flink 的部署模式,目前 Flink支持多种部署模式:本地、集群(Standalone、YARN)、云(GCE/EC2)、Kubenetes。Flink 能够通过该层能够支持不同平台的部署,用户可以根据需要选择使用对应的部署模式。目前在企业中使用最多的是基于 Yarn 进行部署,也就是 Flink On Yarn。
3.1.2、架构角色
Flink 集群采取 Master - Slave 架构
- Master 的角色为 JobManager,负责集群和作业管理;
- Slave 的角色是 TaskManager,负责执行计算任务;
- 客户端 Client 负责管理集群和提交任务,JobManager 和 TaskManager 是集群的进程。
Client(客户端)不是运行时和程序执行的一部分,而是用于准备数据流并将其发送到JobManager。之后,客户端可以断开连接(分离模式),或者保持连接以接收进度报告(附加模式)。
3.1.3、组成部份
以 Standalone 模式提交任务为例说明 Flink 运行时架构的组成部份:
1、JobManager(作业管理器)
JobManager 是一个 Flink 集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说每个应用都应该被唯一的 JobManager 所控制执行。 JobManger 又包含 3 个不同的组件:
- JobMaster
JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job)。所以 JobMaster 和具体的 Job 是一一对应的,多个 Job 可以同时运行在一个 Flink 集群中, 每个 Job 都有一个自 己的 JobMaster。需要注意在早期版本的 Flink中,没有 JobMaster 的概念,而 JobManager的概 念范围较小,实际指的就是现在所说的 JobMaster。
在作业提交时,JobMaster 会先接收到要执行的应用。JobMaster 会把 JobGraph 转换成一 个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并 发执行的任务。JobMaster 会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。 而在运行过程中 ,JobMaster 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
- ResourceManager(资源管理器)
ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个。所谓“资源”, 主要是指 TaskManager的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含 了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行。 这里注意要把 Flink 内置的 ResourceManager 和其他资源管理平台(比如 YARN)的 ResourceManager 区分开。
- 分发器(Dispatcher)
Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件 Dispatcher也会启动一个 Web UI,用来方便地展示和监控作 业执行的信息。Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。
2、TaskManager(任务管理器)
TaskManager 是 Flink 中的工作进程,数据流的具体计算就是它来做的。Flink 集群中必须至少有一个 TaskManager;每一个 TaskManager 都包含了一定数量的任务槽(task slots)。Slot 是资源调度的最小单位,slot 的数量限制了 TaskManager 能够并行处理的任务数量。 启动之后,TaskManager 会向资源管理器注册它的 slots,收到资源管理器的指令后,TaskManager 就会将一个或者多个槽位提供给 JobMaster调用,JobMaster就可以分配任务来执行了。在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager 交换数据。
3.2、核心概念
任务执行流程
- 通过算子 API 所开发的代码,会被 Flink 任务提交客户端解析成jobGraph ;
- 然后,jobGraph 提交到集群 JobManager,转化成 ExecutionGraph(并行化后的执行图);
- 然后,ExecutionGraph 中的各个 task 会以多并行实例(subTask)部署到 taskmanager 上执行;
- subTask 运行的位置是 taskmanager 所提供的槽位(task slot)。
3.2.1、并行度(Parallelism)
Flink 程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子(Operator)有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。
算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。
一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中, 不同的算子可能具有不同的并行度。 例如:如上图所示,当前数据流中有 source、map、window、sink四个算子,其中 sink算 子的并行度为 1,其他算子的并行度都为 2。所以这段流处理程序的并行度就是 2。
并行度的设置
在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
1)代码中设置
我们在代码中,可以很简单地在算子后跟着调用 setParallelism() 方法,来设置当前算子的 并行度:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这种方式设置的并行度,只针对当前算子有效。 另外,我们也可以直接调用执行环境的 setParallelism() 方法,全局设定并行度:
env.setParallelism(2);
这样代码中所有算子,默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
这里要注意的是,由于 keyBy 不是算子,所以无法对 keyBy 设置并行度。
2)提交应用时设置
在使用flink run命令提交应用时,可以增加 -p 参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:
bin/flink run –p 2 –c com.mochi.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。
3)配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:
parallelism.default: 2
这个设置对于整个集群上提交的所有作业有效,初始值为 1。无论在代码中设置、还是 提交时的 -p 参数,都不是必须的;所以在没有指定并行度的时候,就会采用配置文件中的集 群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数。
3.2.2、算子链(Operator Chain)
1)算子间的数据传输
一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding) 模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。
- 一对一(One-to-one,forwarding)
这种模式下,数据流维护着分区以及元素的顺序。比如图中的 source 和 map 算子,source 算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap 等算子都是这种 one-toone 的对应关系。这种关系类似于 Spark 中的窄依赖。
- 重分区(Redistributing)
在这种模式下,数据流的分区会发生改变。比如图中的 map和后面的 keyBy/window 算子之间,以及 keyBy/window 算子和 Sink 算子之间,都是这样的关系。 每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这 些传输方式都会引起重分区的过程,这一过程类似于 Spark 中的 shuffle。
2)合并算子链
在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个 “大”的任务(task),这样原来的算子就成为了真正任务里的一部分,如上图所示。每个 task 会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。
上图中 Source 和 map 之间满足了算子链的要求,所以可以直接合并在一起,形成了一个 任务;因为并行度为 2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表 示的作业最终会有 5 个任务,由 5 个线程并行执行。 将算子链接成 task 是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交 换,在减少时延的同时提升吞吐量。 Flink 默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也 可以在代码中对算子做一些特定的设置:
// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain();
3.2.3、任务槽(Task Slots)
1)任务槽(Task Slots)
Flink中每一个 TaskManager 都是一个 JVM 进程,它可以启动多个独立的线程,来并行执 行多个子任务(subtask)。 很显然,TaskManager 的计算资源是有限的,并行的任务越多,每个线程的资源就会越 少。那一个 TaskManager 到底能并行处理多少个任务呢?为了控制并发量,我们需要在 TaskManager 上对每个任务运行所占用的资源做出明确的划分,这就是所谓的任务槽(task slots)。 每个任务槽(task slot)其实表示了 TaskManager 拥有计算资源的一个固定大小的子集。 这些资源就是用来独立执行一个子任务的。
假如一个TaskManager 有三个 slot,那么它会将管理的内存平均分成三份,每个 slot 独自占据一份。这样一来,我们在 slot 上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其他作业的任务去竞争内存资源了。 所以现在我们只要2个TaskManager,就可以并行处理分配好的5个任务了
2)任务槽数量的设置
在Flink的 flink-conf.yaml 配置文件中,可以设置 TaskManager 的 slot 数量,默认是 1 个 slot。
taskmanager.numberOfTaskSlots: 8
需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可 以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发 环境默认并行度设为机器 CPU 数量的原因。
3)任务对任务槽的共享
在同一个作业中,不同任务节点的并行子任务, 就可以放到同一个slot上执行。
默认情况下,Flink 是允许子任务共享 slot 的。如果我们保持 sink 任务并行度为 1 不变, 而作业提交时设置全局并行度为 6,那么前两个任务节点就会各自有 6个并行子任务,整个流 处理程序则有 13 个子任务。如上图所示,只要属于同一个作业,那么对于不同任务节点(算 子)的并行子任务,就可以放到同一个 slot 上执行。所以对于第一个任务节点 source → map, 它的 6个并行子任务必须分到不同的 slot上,而第二个任务节点 keyBy/window/apply 的并行子 任务却可以和第一个任务节点共享 slot。 当我们将资源密集型和非密集型的任务同时放到一个 slot 中,它们就可以自行分配对资 源占用的比例,从而保证最重的活平均分配给所有的 TaskManager。 slot 共享另一个好处就是允许我们保存完整的作业管道。这样一来,即使某个 TaskManager 出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行。 当然,Flink 默认是允许 slot 共享的,如果希望某个算子对应的任务完全独占一个slot,或者只有某一部分算子共享 slot,我们也可以通过设置“slot 共享组”手动指定:
.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");
这样,只有属于同一个 slot共享组的子任务,才会开启 slot共享;不同组之间的任务是完 全隔离的,必须分配到不同的 slot 上。在这种场景下,总共需要的 slot 数量,就是各个 slot 共 享组最大并行度的总和。
3.2.4 任务槽和并行度的关系
任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说任务槽是静态的概 念,是指 TaskManager 具有的并发执行能力,可以通过参数 taskmanager.numberOfTaskSlots 进行配置;而并行度是动态概念,也就是 TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。
举例说明:假设一共有 3 个 TaskManager,每一个 TaskManager 中的slot 数量设置为 3 个, 那么一共有 9 个 task slot,表示集群最多能并行执行 9 个同一算子的子任务。 而我们定义 word count 程序的处理操作是四个转换算子: source→ flatmap→ reduce→ sink 当所有算子并行度相同时,容易看出 source 和 flatmap 可以合并算子链,于是最终有三个任务节点。
- 并行度为 1 的 word count
其中 Flink-conf.ymltaskmanager.numberOfTaskSlots:3 (建议值:CPU 核心个数)
如果我们没有任何并行度设置,而配置文件中默认 parallelism.default=1,那么程序运行的默 认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot 只有1个。9个slot只用了1个,有8个空闲。
- 并行度为 2 的 word count
其中 Flink-conf.ymltaskmanager.numberOfTaskSlots:3 (建议值:CPU 核心个数)
作业并行度设置为2,那么总共有6个任务,共享任务槽之后会占用2个slot。同样,就有7个slot空闲,计算资源没有充分利用。所以可以看到,设置合适的并行度才能提高效率。
设置任务并行度的方式有以下三种:
Flink-conf.yaml:parallelism.default: 2
Flink客户端: ./bin/flink run –p 2
执行环境: env.setParallelism(2)
- 并行度为 9 的 word count
其中 Flink-conf.ymltaskmanager.numberOfTaskSlots:3 (建议值:CPU 核心个数)
这样需要把所有的slot都利用起来效率才最高。考虑到slot共享,我们可以直接把并行度设置为9,这样 所有27个任务就会完全占用9个slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。
设置任务并行度的方式有以下三种:
Flink-conf.yaml:parallelism.default: 9
Flink客户端: ./bin/flink run –p 9
执行环境: env.setParallelism(9)
- 全局设并行度为 9 ,并单独设 Sink 并行度为 1
其中 Flink-conf.ymltaskmanager.numberOfTaskSlots:3 (建议值:CPU 核心个数)
另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置 sink 算子的并行度为 1。这时其他的算子并行度依然为 9,所以总共会有19 个子任务。根据 slot 共享的原则,它们最终还是会占用全部的 9 个slot,而 sink 任务只在其中一个 slot 上执行。
算子的并行度可以在程序中分别设定,方式如下:
couts.writeAsCsv(outputpath,”n”,” ”).setParallelism(1);