Table API 和 SQL 是最上层的API,在Flink中这两种API被集成在一起,SQL执行的对象也是Flink中的表(Table),所以我们一般会认为它们是一体的。Flink是批流统一的处理框架,无论是批处理(DataSet API)还是流处理(DataStream API),在上层应用中都可以直接使用Table API或者SQL来实现;这两种API对于一张表执行相同的查询操作,得到的结果是完全一样的。 SQL API 是基于 SQL 标准的 Apache Calcite 框架实现的,可通过纯 SQL 来开发和运行一个Flink 任务
。
Flink sql-client环境准备
基于yarn-session模式
# 1、启动Flink(前置条件是启动hadoop集群)
/home/hadoop/opt/flink-1.17.0/bin/yarn-session.sh -d
# 2、启动Flink的sql-client
/home/hadoop/opt/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session
常用配置
# 1、设置结果显示模式:默认table,还可以设置为tableau、changelog
SET sql-client.execution.result-mode=tableau;
# 2、执行环境:默认streaming,也可以设置batch
SET execution.runtime-mode=streaming;
# 3、默认并行度
SET parallelism.default=1;
# 4、设置状态TTL
SET table.exec.state.ttl=1000;
# 5、通过sql文件初始化
vim conf/sql-client-init.sql
SET sql-client.execution.result-mode=tableau;
CREATE DATABASE pyflink_db;
# 6、启动时,指定sql初始化文件
/home/hadoop/opt/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
流处理中的表
动态表和持续查询
流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库中的表完全不同
;而基于表执行的查询操作,也就有了新的含义。
当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。 动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”
(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。
由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”
(snapshot),当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就构成了“持续查询”。
持续查询的步骤如下:
- 流(stream)被转换为动态表(dynamic table)
- 对动态表进行持续查询(continuous query),生成新的动态表
- 生成的动态表被转换成流。
这样只要API将流和动态表的转换封装起来,我们就可以直接在数据流上执行SQL查询,用处理表的方式来做流处理了。
将流转换成动态表
如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加;所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog)流,来构建一个表。
例如,当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作,每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。
SQL持续查询
如在代码中定义了一个SQL查询:
url_count_table = table_env.sql_query("select name, count(url) as cnt from event_table group by name")
当原始动态表不停地插入新的数据时,查询得到的url_count_table会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用table_env.to_changelog_stream()方法
。
如果执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入(Insert)操作了。
alice_visit_table = table_env.sql_query("SELECT url, user FROM event_table WHERE user = 'Cary'")
这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志(changelog)流中只有INSERT操作。
将动态表转换为流
与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete)操作,进行持续的更改
。
将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在Flink中,Table API和SQL支持三种编码方式:
- 仅追加(Append-only)流:仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发出的数据,其实就是动态表中新增的每一行。
- 撤回(Retract)流:撤回流是包含两类消息的流,添加(add)消息和撤回(retract)消息。
具体的编码规则是:insert 插入操作编码为:add消息;delete 删除操作编码为:retract消息;而 update 更新操作则编码为:被更改行的retract消息,和更新后行(新行)的add消息
。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。
- 更新插入(Upsert)流:更新插入流中只包含两种类型的消息:更新插入(upsert)消息和删除(delete)消息。
所谓的“upsert”
其实是“update”
和“insert”
的合成词
,所以 对于更新插入流来说,inster插入操作和update更新操作,统一被编码为upsert消息;而delete删除操作则被编码为delete消息
。
需要注意的是,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流,我们调用to_changelog_stream()得到的其实就是撤回流。而连接到外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性。