理解Iceberg元数据及其数据组织方式

2023年 9月 24日 46.5k 0

简介

本节内容主要讲解Iceberg的一些核心概念,帮助读者理解Iceberg是如何通过元数据来组织数据的。

Iceberg的文件组织方式是按层次存储,数据组织结构的层次很分明,分为元数据层和数据层。它的文件组织结构层次很清晰,总的来说可以分为 元数据层和数据文件层。

参考下图:

image.png

下面是一张基于hive catalog管理的Iceberg表在hdfs文件系统的目录树结构:

[root@node1 iceberg_meta_parse]# tree test
test
├── data
│   ├── 00000-0-249d0dc2-3419-4690-a02f-3dc415a25bed-00001.parquet
│   ├── 00000-0-249d0dc2-3419-4690-a02f-3dc415a25bed-00009.parquet
│   └── 00000-0-249d0dc2-3419-4690-a02f-3dc415a25bed-00010.parquet
└── metadata
    ├── 00000-23fe098f-44cf-4931-bdd6-73362dfdd96b.metadata.json
    ├── 00001-7fa86a5b-d817-4fc4-a763-6533329e65dc.metadata.json
    ├── 37089349-9506-4498-b8f9-b566284ee8d3-m0.avro
    └── snap-6462606415823167841-2-37089349-9506-4498-b8f9-b566284ee8d3.avro

其中,hive 表中 metadata_location 属性会指向 00001-7fa86a5b-d817-4fc4-a763-6533329e65dc.metadata.json 这个meta文件的地址,表示当前表最新的 metadata localtion 。

我们可以在hive终端通过 desc formatted test 来查询表当前最新的 metadata_location 属性

Iceberg的数据更新能力

在介绍Iceberg的核心元数据之前,先了解Iceberg的数据更新能力。

Iceberg一个比较重要的特性是支持行级数据的更新能力。在大数据领域中,由于数据量太大,对数据进行更新如果通过检索出包含目的数据的文件然后对该文件的数据进行变更是一个代价十分大的操作,同时,数据文件一般都是ORC,PARQUET等列式存储格式,一旦写入就不允许变更。因此,一般都不会直接更新数据文件。而copy on write和merge on read是大数据领域中两个应用比较广泛的数据更新手段。

这两种方式各有利弊。

维度 copy on write merge on read
概念 更新时先拷贝一份出来,然后修改,最后替换掉原来的文件 更新时将原来的记录标记为无效(标记删除),将新的记录写到一个新的文件。读取的时候将数据文件和标记删除文件进行合并之后得到正确结果。
优点 利于读取,读取时直接读取新的数据文件,读取速度快 利于写入,吞吐基本相当于系统IO能力;产生冲突时只需要重新生成meta 文件即可,不需要重写数据层文件。
缺点 更新写入时IO高;并发更新产生冲突导致修改的数据文件无效需要重新copy并写入,浪费IO资源 读取效率下降,需要额外的任务对 数据文件和标记删除文件 进行合并。
面向场景 读多写少 写多读少

在Iceberg中,并不绑定使用上面其中一种方式进行读写,它是开放式的,由上层的引擎自己决定。例如 flink-Iceberg-connector在写入时使用了merge on read的方式,在一个commit提交中可能会产生 DataFile,EqualDeleteFile,PositionDeleteFile 三种数据层文件。

而在spark 引擎则使用了copy on write的方式来实现数据更新。不过一张表可能会被多种引擎进行修改,因此读取的时候还是亦做merge on read的操作。

核心概念

本节内容针对上图提及的各个概念进行介绍,帮助读者理解涉及到的核心概念。

metadata

Iceberg最顶层元数据,包含表的schema,历史snapshot列表,分区字段等核心数据。下面是摘抄的一个metadata.json 内容:

{
  "format-version" : 2,
  "table-uuid" : "ea5c3d63-f340-486a-a39d-bf28974d41db",
  "location" : "hdfs://mycluster/warehouse/tablespace/managed/hive/test.db/test",
  "last-sequence-number" : 5,
  "last-updated-ms" : 1695543146001,
  "last-column-id" : 2,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "int"
    }, {
      "id" : 2,
      "name" : "t",
      "required" : false,
      "type" : "timestamp"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "engine.hive.enabled" : "true",
    "totalSize" : "0",
    "numRows" : "0",
    "rawDataSize" : "0",
    "COLUMN_STATS_ACCURATE" : "{"BASIC_STATS":"true","COLUMN_STATS":{"id":"true","t":"true"}}",
    "numFiles" : "0",
    "all_fields_partitioned" : "false",
    "bucketing_version" : "2",
    "write.metadata.delete-after-commit.enabled" : "true",
    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"
  },
  "current-snapshot-id" : 4320673616953473655,
  "snapshots" : [ {
    "sequence-number" : 1,
    "snapshot-id" : 6462606415823167841,
    "timestamp-ms" : 1695541211478,
    "summary" : {
      "operation" : "append",
      "flink.job-id" : "e3202839bb4809dc5fcc78edec233248",
      "flink.max-committed-checkpoint-id" : "1",
      "added-data-files" : "1",
      "added-records" : "3",
      "added-files-size" : "699",
      "changed-partition-count" : "1",
      "total-records" : "3",
      "total-files-size" : "699",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://mycluster/warehouse/tablespace/managed/hive/test.db/test/metadata/snap-6462606415823167841-2-37089349-9506-4498-b8f9-b566284ee8d3.avro",
    "schema-id" : 0
  }, 
  ......
  , {
    "sequence-number" : 5,
    "snapshot-id" : 4320673616953473655,
    "parent-snapshot-id" : 7601590506180033675,
    "timestamp-ms" : 1695543146001,
    "summary" : {
      "operation" : "append",
      "flink.job-id" : "e3202839bb4809dc5fcc78edec233248",
      "flink.max-committed-checkpoint-id" : "33",
      "added-data-files" : "1",
      "added-records" : "2",
      "added-files-size" : "686",
      "changed-partition-count" : "1",
      "total-records" : "5",
      "total-files-size" : "1385",
      "total-data-files" : "2",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://mycluster/warehouse/tablespace/managed/hive/test.db/test/metadata/snap-4320673616953473655-2-919ca41e-ff69-4f50-b0df-5d165f4b0fee.avro",
    "schema-id" : 0
  } ],
  "snapshot-log" : [ {
    "timestamp-ms" : 1695541211478,
    "snapshot-id" : 6462606415823167841
  },
  ...
  ,{
    "timestamp-ms" : 1695543025076,
    "metadata-file" : "hdfs://mycluster/warehouse/tablespace/managed/hive/test.db/test/metadata/00004-50ec1d44-1743-42d0-8778-fcd619c22d0d.metadata.json"
  } ]
}

Snapshot:

快照是Iceberg表在某一个时刻commit操作之后的结果体现,代表在该时刻一张表的全部数据集合的一个视图。每一个提交,都会产生一个metadata文件和一个快照,metadata file是一个json结构文件,里面包含了表的字段信息,历史快照列表等内容,上图最顶层的current metadata location也会指向当前提交的最新的一个metadata file。

随着时间的向前推移,不断有新的快照产生。client可以在在时间轴上选择指定的快照读取数据,这个就是 时间旅行 的特性。

如上图所示,在技术上来说,snapshot就是一系列 manifest的集合。

[root@node1 iceberg_meta_parse]# java -jar  avro-tools-1.11.0.jar  tojson  test/metadata/snap-4320673616953473655-2-919ca41e-ff69-4f50-b0df-5d165f4b0fee.avro | jq .

{
  "manifest_path": "hdfs://mycluster/warehouse/tablespace/managed/hive/test.db/test/metadata/919ca41e-ff69-4f50-b0df-5d165f4b0fee-m0.avro",
  "manifest_length": 6564,
  "partition_spec_id": 0,
  "content": 0,
  "sequence_number": 5,
  "min_sequence_number": 5,
  "added_snapshot_id": 4320673616953473500,
  "added_data_files_count": 1,
  "existing_data_files_count": 0,
  "deleted_data_files_count": 0,
  "added_rows_count": 2,
  "existing_rows_count": 0,
  "deleted_rows_count": 0,
  "partitions": {
    "array": []
  }
}
{
  "manifest_path": "hdfs://mycluster/warehouse/tablespace/managed/hive/test.db/test/metadata/37089349-9506-4498-b8f9-b566284ee8d3-m0.avro",
  "manifest_length": 6568,
  "partition_spec_id": 0,
  "content": 0,
  "sequence_number": 1,
  "min_sequence_number": 1,
  "added_snapshot_id": 6462606415823167000,
  "added_data_files_count": 1,
  "existing_data_files_count": 0,
  "deleted_data_files_count": 0,
  "added_rows_count": 3,
  "existing_rows_count": 0,
  "deleted_rows_count": 0,
  "partitions": {
    "array": []
  }
}

manifest

Manifest是数据文件的集合。一般来说,每一次提交都会产生一个新的Manifest,一个Manifest可以包含一到多个Data File,Equal Delete File,Position Delete File。同时Manifest还会记录每个数据文件的一些元数据,比如每个字段的最大值和最小值,这些元数据可以辅助计算引擎在执行查询时,过滤掉不必要的File IO操作。

我们可以直接通过 avro-tools.jar 这个工具包,将avro格式的manifest文件转换为json结构:

java -jar avro-toos.jar tojson xxx-m0.avro 

下面是一段manifest文件示例内容:

  {
  "status": 1,
  "snapshot_id": {
    "long": 6462606415823167000
  },
  "sequence_number": null,
  "data_file": {
    "content": 0,
    "file_path": "hdfs://mycluster/warehouse/tablespace/managed/hive/test.db/test/data/00000-0-249d0dc2-3419-4690-a02f-3dc415a25bed-00001.parquet",
    "file_format": "PARQUET",
    "partition": {},
    "record_count": 3,
    "file_size_in_bytes": 699,
    "column_sizes": {
      "array": [
        {
          "key": 1,
          "value": 56
        },
        {
          "key": 2,
          "value": 71
        }
      ]
    },
    "value_counts": {
      "array": [
        {
          "key": 1,
          "value": 3
        },
        {
          "key": 2,
          "value": 3
        }
      ]
    },
    "null_value_counts": {
      "array": [
        {
          "key": 1,
          "value": 0
        },
        {
          "key": 2,
          "value": 0
        }
      ]
    },
    "nan_value_counts": {
      "array": []
    },
    "lower_bounds": {
      "array": [
        {
          "key": 1,
          "value": "u0001u0000u0000u0000"
        },
        {
          "key": 2,
          "value": "u0000€Gí5ñu0005u0000"
        }
      ]
    },
    "upper_bounds": {
      "array": [
        {
          "key": 1,
          "value": "u0003u0000u0000u0000"
        },
        {
          "key": 2,
          "value": "€\u0016Í0u0004u0006u0000"
        }
      ]
    },
    "key_metadata": null,
    "split_offsets": {
      "array": [
        4
      ]
    },
    "equality_ids": null,
    "sort_order_id": {
      "int": 0
    }
  }
}

DataFile

DataFile 属于数据文件层,支持ORC,Parquet等主流存储格式。真正的数据被存储到DataFile中,DataFile一旦写入之后不允许变更。

Equal Delete File

Iceberg通过标记删除的方式来实现对数据的删除操,Equal Delete File存储了需要被删除的记录的全部字段,并在manifest中使用equal_delete_fields 来标记该文件的主键字段。假设某次提交需要删除 主键为id, id= =1 和 id=2 两条记录,则产生一个equal delete file :

| id(pk fields) | name | age | 
| - | - | - |
| 1 | zhangsan | 18 |
| 2 | lisi | 19 |

上面表格是一个equal delete file的内容,表示在某次提交中需要删除id为1、2 的两条记录。查询引擎在查询表时需要过滤掉id=1、2的记录;

Posisition Delete File

position delete file也是用来标记删除记录的,用于标记一个data 文件中多次插入相同记录时进行去重操作。里面包含两个字段: 目标data文件,被删除的记录在data文件中的row number。

| data file | row number |
| --- | --- |
| 00001.parquet | 3 |
| 00001.parquet | 7 |

上述position delete file表示读取 00001.parquet 这个数据文件时查询引擎需要删除第 3、7 两行数据。

Iceberg与上层计算引擎

Iceberg 的架构设计层次非常分明,它提供了一套通用的数据读写api,用于对接各种上层计算引擎:

image.png

如上图所示,上层的引擎都统一对接到Iceberg API,API这一层其实有两种。这种架构设计下,其实每个引擎的读写能力或者说读写速度基本都是一致的,因为最终它们都通过调用了Iceberg提供的公共层api。

在读取能力方面,笔者目前了解到的差别在于:

  • Flink引擎是流计算,它在Iceberg中基于行的模式进行读取的,即一行一行读取;而列式存储的数据文件就到flink引擎就需要有一次 列转行的操作。
  • spark,presto,trino计算引擎则是利用了向量化读取的能力,而列式存储的orc,parquet等文件非常适合做向量化读取;
  • 所以在Iceberg的批处理能力方面,spark,trino等传统的批处理引擎是要比flink更快;

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论