Iceberg flink source connector源码走读 一

2023年 9月 30日 10.2k 0

1. 引言

由于Iceberg本身具备了保存多个快照的能力(也叫时间旅行),所以能够保存多个数据版本,亦能够根据 时间旅行的特性为上层引擎提供增量读取的能力。

每个快照都代表某个时刻的数据集合,而两个快照之间可以通过差集计算求出发生变更的部分:

image.png

如上图,根据两个快照指向的manifests集合求差集,就能得到变更的数据集合了(包括删除,更新以及插入操作)。

Flink引擎本身是支持批读和流式读取的,其connector在源码的体现上:

  • 批读就是读取Iceberg的某个快照下的数据文件集合;
  • 而流式读取则是依赖其时间旅行特性,不断取出最新的snapshot,与上一次读取过的snapshot做差集计算,最后读取差集数据文件,实现流式增量读取操作。

到目前为止Iceberg的版本已经迭代到1.3.1 ,而本章内容的分析过程则基于较老的版本 0l.12.x , 发布于 2021年6月左右。但是由于其架构和源码整体没有出现特别大的改动,所以老版本代码不影响我们理解它的基础原理以及核心流程。

2. Flink source connector 批流API

虽说flink对外宣称自己是流批一体的框架,能够做到一套api两种方式运行。但是真正在代码开发的路径上实现了批流一体读取应该是在 Flip-27 特性之后,而该特性发布之前已经有不少的生态connector (kafka,mysql .. )已经接入了Flink, Flip-27特性发布之前的flink引擎为了具备批读的能力,提供了一套DataSet API , 其中DataSet Api核心接口为 InputFormat。

InputFormat 命名风格上借鉴了 Hadoop 的风格,在功能上也比较相近,具体有以下三点:

  • 描述输入的数据如何被划分为不同的 InputSplit(继承于 InputSplitSource)。
  • 描述如何从单个 InputSplit 读取记录,具体包括如何打开一个分配到的 InputSplit,如何从这个 InputSplit 读取一条记录,如何得知记录已经读完和如何关闭这个 InputSplit。
  • 描述如何获取输入数据的统计信息(比如文件的大小、记录的数目),以帮助更好地优化执行计划。

因此以前的FLink在代码开发的路径上流读和批读是割裂的,api以及接口的定义显得比较混乱:

image.png

关于更多Flip-27特性的背景以及优点,本文不做详述,读者可自行去查找相关资料。本段内容想表达的是Api的不统一,导致很多connector其实内部是有 批和流两套 api的。

Iceberg 的flink connector也是如此,而后面Iceberg的迭代升级,社区也基于Flip-27 为Iceberg开发了一套全新的真正的批流一体API,这个会在后面的文章 《Iceberg flink source connector源码走读 二》来分析。

本章内容分析旧版本的flink connector的源代码。

3. 入口代码

入口代码在 org.apache.iceberg.flink.source.FlinkSource的forRowData() 函数,返回一个 FlinkSource.Builder ,用于构建各种输入参数:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     // 流读 or 批读
     .streaming(false)
     .build();

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

FlinkSource.Builder 是参数构建入口,批流共有参数可以支持flilter(条件下推,目前只支持partition条件的下推),project,limit等常见读取优化:


...省略部分代码

// filter 下推,目前仅支持partition字段的下推。
public Builder filters(List filters) {
      contextBuilder.filters(filters);
      return this;
    }

   // 字段投影,仅读取指定的字段,project可以下推到读取具体的数据文件,
   // 结合常见的 orc、parquet 等列式存储文件,可以节省不必要的File io
    public Builder project(TableSchema schema) {
      this.projectedSchema = schema;
      return this;
    }

    // 读取N条
    public Builder limit(long newLimit) {
      contextBuilder.limit(newLimit);
      return this;
    }
    
    //指定一个split的大小,方便调优解决数据倾斜问题。
    public Builder splitSize(Long splitSize) {
      contextBuilder.splitSize(splitSize);
      return this;
    }
    
    // 为上层引擎基于代优化模型提供数据
    public Builder splitOpenFileCost(Long splitOpenFileCost) {
      contextBuilder.splitOpenFileCost(splitOpenFileCost);
      return this;
    }
    
    ... 省略部分代码

4. 批读

  • 传参和构建参数,参考 org.apache.iceberg.flink.source.FlinkSource.Builder, 除了上面介绍的批流公共参数外,还支持读取指定的snapshot:
  • ...省略
    
     public Builder snapshotId(Long snapshotId) {
          contextBuilder.useSnapshotId(snapshotId);
          return this;
      }
        
    ...省略部分代码
    
  • 参数都传递好之后由build() 函数来触发构建source connector:
  • public DataStream build() {
          Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
          // FlinkInputFormat 是批读connector的源头
          FlinkInputFormat format = buildFormat();
    
          ScanContext context = contextBuilder.build();
          TypeInformation typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
    
          if (!context.isStreaming()) {
            int parallelism = inferParallelism(format, context);
            if (env.getMaxParallelism() > 0) {
              parallelism = Math.min(parallelism, env.getMaxParallelism());
            }
            
            // 通过批Api直接构建
            return env.createInput(format, typeInfo).setParallelism(parallelism);
          } else {
           ...... 此处为流读connector构建,省略 ......
          }
        }
    

    从上面的代码可以看到,FlinkInputFormat是关键,它继承了Flink 的 RichInputFormat 接口,需要去实现怎么获取一个split,如何从split中读取一条记录(row)等关键操作。
    继续跟踪代码:

    public class FlinkInputFormat extends RichInputFormat {
    
      ...... 省略部分代码......
      
      @Override
      public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        // Called in Job manager, so it is OK to load table from catalog.
        tableLoader.open();
        try (TableLoader loader = tableLoader) {
          Table table = loader.loadTable();
          // 批读模式下,context会包含一个snapshotid,FlinkSplitGenerator 查找指定snapshot的数据文件集合,并将其封装为splits[] 返回
          // 流读模式下,ontext会包含startSnapshotId和endSnapshotId,FlinkSplitGenerator 则计算两个 snapshot之间的差集,并将其封装为splits[] 返回
          return FlinkSplitGenerator.createInputSplits(table, context);
        }
      }
    
     ......
      
      // 打开一个split,打开后 nextRecord 函数从该split读取records(rows)
      @Override
      public void open(FlinkInputSplit split) {
        this.iterator = new DataIterator(rowDataReader, split.getTask(), io, encryption);
      }
    
      ... 省略部分代码......
    
      @Override
      public RowData nextRecord(RowData reuse) {
        currentReadCount++;
        return iterator.next();
        }
        
        ...... 省略部分代码......
    }
    
    

    而FlinkSplitGenerator.createInputSplits 函数则是通过Iceberg 底层提供的公共的 TableScan 接口来获取到List ,最后FlinkSplitGenerator自己将 CombinedScanTask 集合封装为 FlinkInputSplit ,参考代码:

    class FlinkSplitGenerator {
     static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
        // 调用Iceberg底层的TableScan接口,获取scan的task 列表
        List tasks = tasks(table, context);
        FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
        boolean exposeLocality = context.exposeLocality();
    
        Tasks.range(tasks.size())
            .stopOnFailure()
            .executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null)
            .run(index -> {
              CombinedScanTask task = tasks.get(index);
              String[] hostnames = null;
              if (exposeLocality) {
                hostnames = Util.blockLocations(table.io(), task);
              }
              splits[index] = new FlinkInputSplit(index, task, hostnames);
            });
        return splits;
      }
      
      private static List tasks(Table table, ScanContext context) {
        TableScan scan = table
            .newScan()
            .caseSensitive(context.caseSensitive())
            // 指定读取的字段列表
            .project(context.project());
        //批读模式,指定读取的snapshot
        if (context.snapshotId() != null) {
          scan = scan.useSnapshot(context.snapshotId());
        }
    
        ... 省略部分代码....
        
        /filter 条件下推在这里传递
        if (context.filters() != null) {
          for (Expression filter : context.filters()) {
            scan = scan.filter(filter);
          }
        }
    
        try (CloseableIterable tasksIterable = scan.planTasks()) {
          return Lists.newArrayList(tasksIterable);
        } catch (IOException e) {
          throw new UncheckedIOException("Failed to close table scan: " + scan, e);
        }
      }
    }
    

    到这里,flink引擎的批读取代码基本已经走读完,因为Iceberg的TableScan是一个底层接口,由它统一为上层的计算引擎提供对Iceberg表做扫描(scan)的能力: 返回的 CombinedScanTask 表示一个扫描任务批次集合:

    public interface CombinedScanTask extends ScanTaskGroup {
      /**
       * Return the {@link FileScanTask tasks} in this combined task.
       * @return a Collection of FileScanTask instances.
       */
      // FileScanTask 表示需要扫描的数据文件,以dataFile的(全部 or 部分)数据作为基本单位:一个DataFile + N个delete File 
      Collection files();
    
      @Override
      default Collection tasks() {
        return files();
      }
    
      @Override
      default CombinedScanTask asCombinedScanTask() {
        return this;
      }
    }
    

    FileScanTask 表示对一个文件数据或者 一个数据文件的部分数据 的扫描任务,它包含了一个 DataFile对象和N个DeleteFile。

    • 为什么会存在 一个数据文件的部分数据 这种情况?
    Iceberg 的数据文件是具备随机读(seek)能力的,当一个数据文件太大(例如1GB)时
    就可以将文件切分成多个split,上层计算引擎将不同的split并行读取,
    能解决读取速度并为解决数据倾斜提供了手段。
    
    • 为什么会出现 N个删除文件?
    由于不同的引擎可以对同一张表进行写,不能保证每个引擎都使用了copy on write的操作来更新数据,
    因此Iceberg读取数据时遵循Merge On Read 机制保证数据一致性。
    

    所以所谓的Merge on read,在源码中的体现则是一个scan task读取数据的过程:

    读取一个 DataFile中的数据并经过 DeleteFiles 的过滤之后,才能得真正的数据集。 所有的DataFile 都和它对应的DelereFiles 经过Merge操作之后,就得到当前表的所有的更新和删除之后的数据了

    参考源码 org.apache.iceberg.FileScanTask 和 ContentScanTask:

    public interface FileScanTask extends ContentScanTask, SplittableScanTask {
      /**
       * A list of {@link DeleteFile delete files} to apply when reading the task's data file.
       *
       * @return a list of delete files to apply.
       * N个删除文件 deleteFile (包括equality delete file 和 Position delete file)。
       */
      List deletes();
      
      ......省略部分代码......
    }
    
    public interface ContentScanTask extends ScanTask {
      // 数据文件dataFIle 
      F file();
      // split 依据
      long start();  
      long length();
    
      //文件级别的条件下推
      Expression residual();
      
      ... 省略部分代码 ...
     }
    

    5. 流读

    流式读取主要的目的是将Iceberg增量读取的能力封装到flink connector之上。

    5.1 基本流程:

    image.png

    流读基本流程如上图:

    • StreamMonitorFunction负责定时 refresh table ,求出最新快照与上一个快照的之间的差集,并将其传递到下游算子。
    • StreamReaderOperator 算子负责将ScanTask 对应的 DataFile和DeleteFiles 做merge操作之后读出文件的记录(records)。这个过程通过内部的一个FlinkInputFormat对象直接复用了批读的能力。
    • 下游算子将records做进一步处理(map,flatmap,sink ......)

    5.2 核心代码

    5.2.1 StreamingMonitorFunction

    StreamingMonitorFunction 继承了RichSourceFunction, 并实现了 CheckpointedFunction 接口,将最近一次访问的Iceberg快照id保存到flink 的状态中实现精准一次的语义,任务宕机可以从状态中恢复而达到断点续传的效果。

    StreamingMonitorFunction 的核心逻辑也很简单:

    • 不断refresh table元数据,当发现table产生新的snapshot时,直接求出两个snapshot之间的差集数据(元数据),并封装为FlinkInputSplit返回。 求差集的过程主要依赖了Iceberg底层的Api: TableScan接口。
    • 最后,将生成好的FlinkInputSplit[] 传递到下游算子: sourceContext.collect()
    public class StreamingMonitorFunction extends RichSourceFunction implements CheckpointedFunction {
     ... 省略部分代码......
     
     //求两个Snapshot之间数据差集的核心接口
     private final ScanContext scanContext;
    
     private volatile boolean isRunning = true;
    
     // The checkpoint thread is not the same thread that running the function for SourceStreamTask now. It's necessary to
     // mark this as volatile.
     private volatile long lastSnapshotId = INIT_LAST_SNAPSHOT_ID;  // INIT_LAST_SNAPSHOT_ID = -1 
    
     private transient SourceContext sourceContext;
     
     private transient Table table;
     // 保存最近一次读取的snapshotId
     private transient ListState lastSnapshotIdState; 
    
    
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
       // Load iceberg table from table loader.
       tableLoader.open();
       table = tableLoader.loadTable();
    
       // Initialize the flink state for last snapshot id.
       lastSnapshotIdState = context.getOperatorStateStore().getListState(
           new ListStateDescriptor(
               "snapshot-id-state",
               LongSerializer.INSTANCE));
    
       // Restore the last-snapshot-id from flink's state if possible.
       // 断点续传,读取最近一次快照的数据开始恢复
       if (context.isRestored()) {
         LOG.info("Restoring state for the {}.", getClass().getSimpleName());
         lastSnapshotId = lastSnapshotIdState.get().iterator().next();
       } else if (scanContext.startSnapshotId() != null) {
         Preconditions.checkNotNull(table.currentSnapshot(), "Don't have any available snapshot in table.");
         // 非恢复类型的任务,直接查询table的最新snapshotId
         long currentSnapshotId = table.currentSnapshot().snapshotId();
         Preconditions.checkState(SnapshotUtil.isAncestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
             "The option start-snapshot-id %s is not an ancestor of the current snapshot.", scanContext.startSnapshotId());
    
         lastSnapshotId = scanContext.startSnapshotId();
       }
     }
    
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws Exception {
       lastSnapshotIdState.clear();
       // 触发checkpoint时保存最新已经读取过的snapshot id
       lastSnapshotIdState.add(lastSnapshotId);
     }
    
     @Override
     public void run(SourceContext ctx) throws Exception {
       this.sourceContext = ctx;
       while (isRunning) {
         monitorAndForwardSplits();
         Thread.sleep(scanContext.monitorInterval().toMillis());
       }
     }
    
     /**
     * 刷新table的元数据,如果发现新的snapshot,则求出两个snapshot的差集数据(FlinkInputSplit)
     * FlinkSplitGenerator 的逻辑已经在前面批读章节中介绍过,这里不再分析。
     */
     private void monitorAndForwardSplits() {
       // Refresh the table to get the latest committed snapshot.
       table.refresh();
    
       Snapshot snapshot = table.currentSnapshot();
       if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
         long snapshotId = snapshot.snapshotId();
    
         ScanContext newScanContext;
         if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
           newScanContext = scanContext.copyWithSnapshotId(snapshotId);
         } else {
           newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
         }
         
         // 关键在 newScanContext 生成了List
         FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
    
         // only need to hold the checkpoint lock when emitting the splits and updating lastSnapshotId
         synchronized (sourceContext.getCheckpointLock()) {
           for (FlinkInputSplit split : splits) {
             sourceContext.collect(split);
           }
    
           lastSnapshotId = snapshotId;
         }
       }
     }
    }
    

    5.2.2 StreamReaderOperator

    StreamReaderOperator 是流式读取链路中的第二个算子,它的输入是FlinkInputSplit,输出是RowData,所以它的主要逻辑是 从上游Source算子接收 数据文件的元数据(dataFile,deletefiles), 然后读取这些数据文件的records,与deleteFiles做merge操作,最后将 records输出到下游算子。

    核心方法有 processElement, processSplits

    public class StreamingReaderOperator extends AbstractStreamOperator
       implements OneInputStreamOperator {
    
     ... 省略部分代码 ......
    
     // 解析FlinkInputSplit为rows的线程池,和checkpoint保持同一个线程, TODO : 为什么需要这么做呢?
     // It's the same thread that is running this operator and checkpoint actions. we use this executor to schedule only
     // one split for future reading, so that a new checkpoint could be triggered without blocking long time for exhausting
     // all scheduled splits.
     private final MailboxExecutor executor;
     
     // 真正解析文件数据,执行Merge操作都封装在FlinkInputFormat中去了。这里直接复用了批读的能力。
     private FlinkInputFormat format;
    
     // 通过sourceContext将解析到的rows发送到下游算子
     private transient SourceFunction.SourceContext sourceContext;
    
     // split存储到状态中,任务从checkpoint恢复时需要恢复未读取完的split
     private transient ListState inputSplitsState;
     private transient Queue splits;
     
     
     
     @Override
     public void processElement(StreamRecord element) {
       splits.add(element.getValue());
       enqueueProcessSplits();
     }
    
     private void enqueueProcessSplits() {
       if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
         currentSplitState = SplitState.RUNNING;
         executor.execute(this::processSplits, this.getClass().getSimpleName());
       }
     }
    
     // 处理一个split,从orc,partquet等文件中读取records,并把数据推送到下游算子
     private void processSplits() throws IOException {
       FlinkInputSplit split = splits.poll();
       if (split == null) {
         currentSplitState = SplitState.IDLE;
         return;
       }
    
       format.open(split);
       try {
         RowData nextElement = null;
         while (!format.reachedEnd()) {
           nextElement = format.nextRecord(nextElement);
           sourceContext.collect(nextElement);
         }
       } finally {
         currentSplitState = SplitState.IDLE;
         format.close();
       }
    
       // Re-schedule to process the next split.
       enqueueProcessSplits();
     }
     ...... 省略其余代码 ......
     
    }
    
    

    到这里,基本已经分析完流式增量读取的主要核心代码和流程。和批读的差别在于,source operator的操作是一个while循环: refresh table的元数据获取最新的Snapshot,并求出与之前的Snapshot之间的数据差集,最复用了批读的数据解析能力,将FLinkInputSplit 解析为RowData推送到下游算子。

    6. 架构存在的缺陷

    本节内容主要分析流式增量读取的缺陷:

    • source operator输出的是FlinkInputSplit,而FlinkInputSplit则在第二个 operator StreamingReaderOperator 才开始被解析成RowData。因此StreamingReaderOperator 的数据读取能力会成为flink job的一个瓶颈之一。我们知道在Flink作业中除了有数据流之外,还有触发算子执行快照的的关键数据 checkpoint barrier, 而主流的Orc,parquet文件都是具备较高的压缩比的,数据字段重复率越高,其压缩率越好,即使split会进行切分,但是我们亦无法预估一个数据文件究竟容纳多少数据。笔者在实践中就遇到一个3.5M的paquet文件包含了 1300w条数据,StreamingReaderOperator的读取能力成为瓶颈,导致 checkpoint barrier无法向下游推进,而Flink job失败。如下图所示:

    image.png

    • Source operator状态中保存的数据是Iceberg table 的Snapshot id,因此精准一次的语义的基本单位是快照,粒度太大。任务恢复代价较高: 需要重新读取整个Snapshot全部数据。比较完美的应该是存储每个数据文件,并且可以记住已经读取到了第几行,下次恢复时就从上一次的offset读取,这样任务的恢复是比较轻量级的。

    8. 总结

    本文对Flink Iceberg read connector的源码做了一个解读,分析了该版本 connector的核心代码及流程。也总结了它的增量流式读取架构的缺陷: 任务稳定性差,并不适合应用到生产环境上。

    虽然架构有缺陷,但是并不影响我们对其进行分析,理解它被应用到流式湖仓场景的基本原理。

    当前Iceberg社区已经抛弃了该connector,并基于Flip-27 标准开发了全新的 source connector,完美解决了上文提及到的问题,笔者会在后面出一篇文章做源码分析。

    相关文章

    如何更改WordPress常量FS_METHOD
    如何为区块编辑器、Elementor等构建WordPress文章模板
    如何彻底地删除WordPress主题及相关内容
    如何使用WordPress搭建一个内网
    WordPress插件开发须知:如何开发插件并发布到WordPress插件目录
    AI和ChatGPT如何增强WordPress功能

    发布评论