1. 引言
由于Iceberg本身具备了保存多个快照的能力(也叫时间旅行),所以能够保存多个数据版本,亦能够根据 时间旅行的特性为上层引擎提供增量读取的能力。
每个快照都代表某个时刻的数据集合,而两个快照之间可以通过差集计算求出发生变更的部分:
如上图,根据两个快照指向的manifests集合求差集,就能得到变更的数据集合了(包括删除,更新以及插入操作)。
Flink引擎本身是支持批读和流式读取的,其connector在源码的体现上:
- 批读就是读取Iceberg的某个快照下的数据文件集合;
- 而流式读取则是依赖其时间旅行特性,不断取出最新的snapshot,与上一次读取过的snapshot做差集计算,最后读取差集数据文件,实现流式增量读取操作。
到目前为止Iceberg的版本已经迭代到1.3.1 ,而本章内容的分析过程则基于较老的版本 0l.12.x , 发布于 2021年6月左右。但是由于其架构和源码整体没有出现特别大的改动,所以老版本代码不影响我们理解它的基础原理以及核心流程。
2. Flink source connector 批流API
虽说flink对外宣称自己是流批一体的框架,能够做到一套api两种方式运行。但是真正在代码开发的路径上实现了批流一体读取应该是在 Flip-27 特性之后,而该特性发布之前已经有不少的生态connector (kafka,mysql .. )已经接入了Flink, Flip-27特性发布之前的flink引擎为了具备批读的能力,提供了一套DataSet API , 其中DataSet Api核心接口为 InputFormat。
InputFormat 命名风格上借鉴了 Hadoop 的风格,在功能上也比较相近,具体有以下三点:
- 描述输入的数据如何被划分为不同的 InputSplit(继承于 InputSplitSource)。
- 描述如何从单个 InputSplit 读取记录,具体包括如何打开一个分配到的 InputSplit,如何从这个 InputSplit 读取一条记录,如何得知记录已经读完和如何关闭这个 InputSplit。
- 描述如何获取输入数据的统计信息(比如文件的大小、记录的数目),以帮助更好地优化执行计划。
因此以前的FLink在代码开发的路径上流读和批读是割裂的,api以及接口的定义显得比较混乱:
关于更多Flip-27特性的背景以及优点,本文不做详述,读者可自行去查找相关资料。本段内容想表达的是Api的不统一,导致很多connector其实内部是有 批和流两套 api的。
Iceberg 的flink connector也是如此,而后面Iceberg的迭代升级,社区也基于Flip-27 为Iceberg开发了一套全新的真正的批流一体API,这个会在后面的文章 《Iceberg flink source connector源码走读 二》来分析。
本章内容分析旧版本的flink connector的源代码。
3. 入口代码
入口代码在 org.apache.iceberg.flink.source.FlinkSource的forRowData() 函数,返回一个 FlinkSource.Builder ,用于构建各种输入参数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
// 流读 or 批读
.streaming(false)
.build();
// Print all records to stdout.
batch.print();
// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");
FlinkSource.Builder 是参数构建入口,批流共有参数可以支持flilter(条件下推,目前只支持partition条件的下推),project,limit等常见读取优化:
...省略部分代码
// filter 下推,目前仅支持partition字段的下推。
public Builder filters(List filters) {
contextBuilder.filters(filters);
return this;
}
// 字段投影,仅读取指定的字段,project可以下推到读取具体的数据文件,
// 结合常见的 orc、parquet 等列式存储文件,可以节省不必要的File io
public Builder project(TableSchema schema) {
this.projectedSchema = schema;
return this;
}
// 读取N条
public Builder limit(long newLimit) {
contextBuilder.limit(newLimit);
return this;
}
//指定一个split的大小,方便调优解决数据倾斜问题。
public Builder splitSize(Long splitSize) {
contextBuilder.splitSize(splitSize);
return this;
}
// 为上层引擎基于代优化模型提供数据
public Builder splitOpenFileCost(Long splitOpenFileCost) {
contextBuilder.splitOpenFileCost(splitOpenFileCost);
return this;
}
... 省略部分代码
4. 批读
...省略
public Builder snapshotId(Long snapshotId) {
contextBuilder.useSnapshotId(snapshotId);
return this;
}
...省略部分代码
public DataStream build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
// FlinkInputFormat 是批读connector的源头
FlinkInputFormat format = buildFormat();
ScanContext context = contextBuilder.build();
TypeInformation typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
if (!context.isStreaming()) {
int parallelism = inferParallelism(format, context);
if (env.getMaxParallelism() > 0) {
parallelism = Math.min(parallelism, env.getMaxParallelism());
}
// 通过批Api直接构建
return env.createInput(format, typeInfo).setParallelism(parallelism);
} else {
...... 此处为流读connector构建,省略 ......
}
}
从上面的代码可以看到,FlinkInputFormat是关键,它继承了Flink 的 RichInputFormat 接口,需要去实现怎么获取一个split,如何从split中读取一条记录(row)等关键操作。
继续跟踪代码:
public class FlinkInputFormat extends RichInputFormat {
...... 省略部分代码......
@Override
public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// Called in Job manager, so it is OK to load table from catalog.
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
// 批读模式下,context会包含一个snapshotid,FlinkSplitGenerator 查找指定snapshot的数据文件集合,并将其封装为splits[] 返回
// 流读模式下,ontext会包含startSnapshotId和endSnapshotId,FlinkSplitGenerator 则计算两个 snapshot之间的差集,并将其封装为splits[] 返回
return FlinkSplitGenerator.createInputSplits(table, context);
}
}
......
// 打开一个split,打开后 nextRecord 函数从该split读取records(rows)
@Override
public void open(FlinkInputSplit split) {
this.iterator = new DataIterator(rowDataReader, split.getTask(), io, encryption);
}
... 省略部分代码......
@Override
public RowData nextRecord(RowData reuse) {
currentReadCount++;
return iterator.next();
}
...... 省略部分代码......
}
而FlinkSplitGenerator.createInputSplits 函数则是通过Iceberg 底层提供的公共的 TableScan 接口来获取到List ,最后FlinkSplitGenerator自己将 CombinedScanTask 集合封装为 FlinkInputSplit ,参考代码:
class FlinkSplitGenerator {
static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
// 调用Iceberg底层的TableScan接口,获取scan的task 列表
List tasks = tasks(table, context);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
boolean exposeLocality = context.exposeLocality();
Tasks.range(tasks.size())
.stopOnFailure()
.executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null)
.run(index -> {
CombinedScanTask task = tasks.get(index);
String[] hostnames = null;
if (exposeLocality) {
hostnames = Util.blockLocations(table.io(), task);
}
splits[index] = new FlinkInputSplit(index, task, hostnames);
});
return splits;
}
private static List tasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
// 指定读取的字段列表
.project(context.project());
//批读模式,指定读取的snapshot
if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}
... 省略部分代码....
/filter 条件下推在这里传递
if (context.filters() != null) {
for (Expression filter : context.filters()) {
scan = scan.filter(filter);
}
}
try (CloseableIterable tasksIterable = scan.planTasks()) {
return Lists.newArrayList(tasksIterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan: " + scan, e);
}
}
}
到这里,flink引擎的批读取代码基本已经走读完,因为Iceberg的TableScan是一个底层接口,由它统一为上层的计算引擎提供对Iceberg表做扫描(scan)的能力: 返回的 CombinedScanTask 表示一个扫描任务批次集合:
public interface CombinedScanTask extends ScanTaskGroup {
/**
* Return the {@link FileScanTask tasks} in this combined task.
* @return a Collection of FileScanTask instances.
*/
// FileScanTask 表示需要扫描的数据文件,以dataFile的(全部 or 部分)数据作为基本单位:一个DataFile + N个delete File
Collection files();
@Override
default Collection tasks() {
return files();
}
@Override
default CombinedScanTask asCombinedScanTask() {
return this;
}
}
FileScanTask 表示对一个文件数据或者 一个数据文件的部分数据 的扫描任务,它包含了一个 DataFile对象和N个DeleteFile。
- 为什么会存在 一个数据文件的部分数据 这种情况?
Iceberg 的数据文件是具备随机读(seek)能力的,当一个数据文件太大(例如1GB)时
就可以将文件切分成多个split,上层计算引擎将不同的split并行读取,
能解决读取速度并为解决数据倾斜提供了手段。
- 为什么会出现 N个删除文件?
由于不同的引擎可以对同一张表进行写,不能保证每个引擎都使用了copy on write的操作来更新数据,
因此Iceberg读取数据时遵循Merge On Read 机制保证数据一致性。
所以所谓的Merge on read,在源码中的体现则是一个scan task读取数据的过程:
读取一个 DataFile中的数据并经过 DeleteFiles 的过滤之后,才能得真正的数据集。 所有的DataFile 都和它对应的DelereFiles 经过Merge操作之后,就得到当前表的所有的更新和删除之后的数据了
参考源码 org.apache.iceberg.FileScanTask 和 ContentScanTask:
public interface FileScanTask extends ContentScanTask, SplittableScanTask {
/**
* A list of {@link DeleteFile delete files} to apply when reading the task's data file.
*
* @return a list of delete files to apply.
* N个删除文件 deleteFile (包括equality delete file 和 Position delete file)。
*/
List deletes();
......省略部分代码......
}
public interface ContentScanTask extends ScanTask {
// 数据文件dataFIle
F file();
// split 依据
long start();
long length();
//文件级别的条件下推
Expression residual();
... 省略部分代码 ...
}
5. 流读
流式读取主要的目的是将Iceberg增量读取的能力封装到flink connector之上。
5.1 基本流程:
流读基本流程如上图:
- StreamMonitorFunction负责定时 refresh table ,求出最新快照与上一个快照的之间的差集,并将其传递到下游算子。
- StreamReaderOperator 算子负责将ScanTask 对应的 DataFile和DeleteFiles 做merge操作之后读出文件的记录(records)。这个过程通过内部的一个FlinkInputFormat对象直接复用了批读的能力。
- 下游算子将records做进一步处理(map,flatmap,sink ......)
5.2 核心代码
5.2.1 StreamingMonitorFunction
StreamingMonitorFunction 继承了RichSourceFunction
, 并实现了 CheckpointedFunction
接口,将最近一次访问的Iceberg快照id保存到flink 的状态中实现精准一次的语义,任务宕机可以从状态中恢复而达到断点续传的效果。
StreamingMonitorFunction
的核心逻辑也很简单:
- 不断refresh table元数据,当发现table产生新的snapshot时,直接求出两个snapshot之间的差集数据(元数据),并封装为FlinkInputSplit返回。 求差集的过程主要依赖了Iceberg底层的Api: TableScan接口。
- 最后,将生成好的FlinkInputSplit[] 传递到下游算子:
sourceContext.collect()
public class StreamingMonitorFunction extends RichSourceFunction implements CheckpointedFunction {
... 省略部分代码......
//求两个Snapshot之间数据差集的核心接口
private final ScanContext scanContext;
private volatile boolean isRunning = true;
// The checkpoint thread is not the same thread that running the function for SourceStreamTask now. It's necessary to
// mark this as volatile.
private volatile long lastSnapshotId = INIT_LAST_SNAPSHOT_ID; // INIT_LAST_SNAPSHOT_ID = -1
private transient SourceContext sourceContext;
private transient Table table;
// 保存最近一次读取的snapshotId
private transient ListState lastSnapshotIdState;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Load iceberg table from table loader.
tableLoader.open();
table = tableLoader.loadTable();
// Initialize the flink state for last snapshot id.
lastSnapshotIdState = context.getOperatorStateStore().getListState(
new ListStateDescriptor(
"snapshot-id-state",
LongSerializer.INSTANCE));
// Restore the last-snapshot-id from flink's state if possible.
// 断点续传,读取最近一次快照的数据开始恢复
if (context.isRestored()) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
lastSnapshotId = lastSnapshotIdState.get().iterator().next();
} else if (scanContext.startSnapshotId() != null) {
Preconditions.checkNotNull(table.currentSnapshot(), "Don't have any available snapshot in table.");
// 非恢复类型的任务,直接查询table的最新snapshotId
long currentSnapshotId = table.currentSnapshot().snapshotId();
Preconditions.checkState(SnapshotUtil.isAncestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
"The option start-snapshot-id %s is not an ancestor of the current snapshot.", scanContext.startSnapshotId());
lastSnapshotId = scanContext.startSnapshotId();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
lastSnapshotIdState.clear();
// 触发checkpoint时保存最新已经读取过的snapshot id
lastSnapshotIdState.add(lastSnapshotId);
}
@Override
public void run(SourceContext ctx) throws Exception {
this.sourceContext = ctx;
while (isRunning) {
monitorAndForwardSplits();
Thread.sleep(scanContext.monitorInterval().toMillis());
}
}
/**
* 刷新table的元数据,如果发现新的snapshot,则求出两个snapshot的差集数据(FlinkInputSplit)
* FlinkSplitGenerator 的逻辑已经在前面批读章节中介绍过,这里不再分析。
*/
private void monitorAndForwardSplits() {
// Refresh the table to get the latest committed snapshot.
table.refresh();
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
long snapshotId = snapshot.snapshotId();
ScanContext newScanContext;
if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
newScanContext = scanContext.copyWithSnapshotId(snapshotId);
} else {
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}
// 关键在 newScanContext 生成了List
FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
// only need to hold the checkpoint lock when emitting the splits and updating lastSnapshotId
synchronized (sourceContext.getCheckpointLock()) {
for (FlinkInputSplit split : splits) {
sourceContext.collect(split);
}
lastSnapshotId = snapshotId;
}
}
}
}
5.2.2 StreamReaderOperator
StreamReaderOperator 是流式读取链路中的第二个算子,它的输入是FlinkInputSplit,输出是RowData,所以它的主要逻辑是 从上游Source算子接收 数据文件的元数据(dataFile,deletefiles), 然后读取这些数据文件的records,与deleteFiles做merge操作,最后将 records输出到下游算子。
核心方法有 processElement, processSplits
public class StreamingReaderOperator extends AbstractStreamOperator
implements OneInputStreamOperator {
... 省略部分代码 ......
// 解析FlinkInputSplit为rows的线程池,和checkpoint保持同一个线程, TODO : 为什么需要这么做呢?
// It's the same thread that is running this operator and checkpoint actions. we use this executor to schedule only
// one split for future reading, so that a new checkpoint could be triggered without blocking long time for exhausting
// all scheduled splits.
private final MailboxExecutor executor;
// 真正解析文件数据,执行Merge操作都封装在FlinkInputFormat中去了。这里直接复用了批读的能力。
private FlinkInputFormat format;
// 通过sourceContext将解析到的rows发送到下游算子
private transient SourceFunction.SourceContext sourceContext;
// split存储到状态中,任务从checkpoint恢复时需要恢复未读取完的split
private transient ListState inputSplitsState;
private transient Queue splits;
@Override
public void processElement(StreamRecord element) {
splits.add(element.getValue());
enqueueProcessSplits();
}
private void enqueueProcessSplits() {
if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
currentSplitState = SplitState.RUNNING;
executor.execute(this::processSplits, this.getClass().getSimpleName());
}
}
// 处理一个split,从orc,partquet等文件中读取records,并把数据推送到下游算子
private void processSplits() throws IOException {
FlinkInputSplit split = splits.poll();
if (split == null) {
currentSplitState = SplitState.IDLE;
return;
}
format.open(split);
try {
RowData nextElement = null;
while (!format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
sourceContext.collect(nextElement);
}
} finally {
currentSplitState = SplitState.IDLE;
format.close();
}
// Re-schedule to process the next split.
enqueueProcessSplits();
}
...... 省略其余代码 ......
}
到这里,基本已经分析完流式增量读取的主要核心代码和流程。和批读的差别在于,source operator的操作是一个while循环: refresh table的元数据获取最新的Snapshot,并求出与之前的Snapshot之间的数据差集,最复用了批读的数据解析能力,将FLinkInputSplit 解析为RowData推送到下游算子。
6. 架构存在的缺陷
本节内容主要分析流式增量读取的缺陷:
- source operator输出的是FlinkInputSplit,而FlinkInputSplit则在第二个 operator StreamingReaderOperator 才开始被解析成RowData。因此StreamingReaderOperator 的数据读取能力会成为flink job的一个瓶颈之一。我们知道在Flink作业中除了有数据流之外,还有触发算子执行快照的的关键数据
checkpoint barrier
, 而主流的Orc,parquet文件都是具备较高的压缩比的,数据字段重复率越高,其压缩率越好,即使split会进行切分,但是我们亦无法预估一个数据文件究竟容纳多少数据。笔者在实践中就遇到一个3.5M的paquet文件包含了 1300w条数据,StreamingReaderOperator的读取能力成为瓶颈,导致 checkpoint barrier无法向下游推进,而Flink job失败。如下图所示:
- Source operator状态中保存的数据是Iceberg table 的Snapshot id,因此精准一次的语义的基本单位是快照,粒度太大。任务恢复代价较高: 需要重新读取整个Snapshot全部数据。比较完美的应该是存储每个数据文件,并且可以记住已经读取到了第几行,下次恢复时就从上一次的offset读取,这样任务的恢复是比较轻量级的。
8. 总结
本文对Flink Iceberg read connector的源码做了一个解读,分析了该版本 connector的核心代码及流程。也总结了它的增量流式读取架构的缺陷: 任务稳定性差,并不适合应用到生产环境上。
虽然架构有缺陷,但是并不影响我们对其进行分析,理解它被应用到流式湖仓场景的基本原理。
当前Iceberg社区已经抛弃了该connector,并基于Flip-27 标准开发了全新的 source connector,完美解决了上文提及到的问题,笔者会在后面出一篇文章做源码分析。