Oracle 数据量 150T,PostgreSQL 能搞定吗?

2024年 5月 16日 96.8k 0

此版本的 parquet_s3_fdw 适用于 PostgreSQL 13、14、15 和 16。

只读模式下的 Apache Parquet 外部数据包装器,支持 PostgreSQL 访问 S3 存储。

用法

加载扩展

CREATE EXTENSION parquet_s3_fdw;

创建服务器

CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw;

如果要使用 MinIO 而不是 AWS S3,请使用 use_minio 选项创建服务器。

CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw OPTIONS (use_minio 'true');

创建用户映射

如果要访问 Amazon S3,则必须指定用户名和密码。

CREATE USER MAPPING FOR public SERVER parquet_s3_srv OPTIONS (user 's3user', password 's3password');

创建外表

现在你应该能够从 Parquet 文件创建外表了。目前parquet_s3_fdw支持下面这些数据列类型(还将会增加):

Arrow 类型

SQL 类型

INT8

INT2

INT16

INT2

INT32

INT4

INT64

INT8

FLOAT

FLOAT4

DOUBLE

FLOAT8

TIMESTAMP

TIMESTAMP

DATE32

DATE

STRING

TEXT

BINARY

BYTEA

LIST

ARRAY

MAP

JSONB

目前parquet_s3_fdw不支持结构体和嵌套列表。

支持以下选项:

• filename - 要读取的 Parquet 文件的路径列表,以空格分隔。您可以用s3://开始来指定 AWS S3 上的路径。不支持混合使用本地路径和 S3 路径;

• dirname - 具有要读取的 Parquet 文件的目录路径;

• sorted - 用来预排序 Parquet 文件的,空格分隔的列列表;当使用ORDER BY子句运行查询,或在其他情况下带有预排序的列集会有用(Group Aggregate,Merge Join)时,这将有助于 postgres 避免冗余的排序;

• files_in_order - 要求以filename指定或由files_func返回的文件,根据sorted选项进行排序,并且在范围上没有交叉;这允许在并行多文件扫描节点上使用Gather Merge节点(默认值为false);

• use_mmap - 是否使用内存映射操作,而不是文件读取操作(默认值为false);

• use_threads - 启用 Apache Arrow 的并行列解码/解压(默认值为false);

• files_func - 用户定义的函数,由 parquet_s3_fdw 在每次查询时用于检索 parquet 文件列表;函数必须接受一个JSONB参数,并返回 parquet 文件完整路径的文本数组;

• files_func_arg - 由 files_func 指定的函数的参数。

• max_open_files - 同时打开的 Parquet 文件的数量限制。

• region - 用于连接到的 AWS 区域的值(默认值为ap-northeast-1)。

• endpoint - 用于连接的地址和端口(默认值为127.0.0.1:9000)。

可以为单个和一组 Parquet 文件创建外部表。也可以指定一个用户定义的函数,该函数会返回一个文件路径列表。根据文件数量和表选项,parquet_s3_fdw可以使用以下的一种执行策略:

策略

描述

Single File

基本的单文件读取器

Multifile

按顺序逐个处理 Parquet 文件的读取器

Multifile Merge

该读取器会合并预排序的 Parquet 文件,以便生成的结果也是有序的;在指定了sorted选项,并且查询计划需要排序(例如包含了ORDER BY子句)时会使用

Caching Multifile Merge

Multifile Merge相同,但会限制同时打开的文件数;当指定的 Parquet 文件数超过max_open_files时会使用

GUC 变量:

• parquet_fdw.use_threads - 允许用户启用或禁用线程的全局开关(默认值为true);

• parquet_fdw.enable_multifile - 启用多文件读取器(默认值为true)。

• parquet_fdw.enable_multifile_merge - 启用多文件合并读取器(默认值为true)。

示例:

CREATE FOREIGN TABLE userdata (
    id           int,
    first_name   text,
    last_name    text
)
SERVER parquet_s3_srv
OPTIONS (
    filename 's3://bucket/dir/userdata1.parquet'
);

访问外表

SELECT * FROM userdata;

并行查询

parquet_s3_fdw还支持并行查询执行(注意不要与 Apache Arrow 的多线程解码功能混淆)。

导入

parquet_s3_fdw还支持 IMPORT FOREIGN SCHEMA 命令,来发现文件系统上指定目录中的 parquet 文件,并根据这些文件创建外部表。它可以像这样使用:

IMPORT FOREIGN SCHEMA "/path/to/directory"
FROM SERVER parquet_s3_srv
INTO public;

重要的是,这里的remote_schema是一个本地文件系统的目录路径,并且用双引号引起来。

将 parquet 文件导入到外部表的另一种方法是,使用import_parquet_s3或import_parquet_s3_explicit:

CREATE FUNCTION import_parquet_s3(
    tablename   text,
    schemaname  text,
    servername  text,
    userfunc    regproc,
    args        jsonb,
    options     jsonb)

CREATE FUNCTION import_parquet_s3_explicit(
    tablename   text,
    schemaname  text,
    servername  text,
    attnames    text[],
    atttypes    regtype[],
    userfunc    regproc,
    args        jsonb,
    options     jsonb)

import_parquet_s3和import_parquet_s3_explicit之间的唯一区别是,后者允许指定一组要导入的属性/列。attnames和atttypes分别是属性名称和属性类型的数组(参见下面的示例)。

userfunc是一个用户自定义函数。它必须接受一个jsonb参数,并返回一个要导入的 parquet 文件的文件系统路径的文本数组。args是用户指定的 jsonb 对象,以作为参数传递给userfunc。这种函数的简单实现和用法,可以如下面这样:

CREATE FUNCTION list_parquet_s3_files(args jsonb)
RETURNS text[] AS
$$
BEGIN
    RETURN array_agg(args->>'dir' || '/' || filename)
           FROM pg_ls_dir(args->>'dir') AS files(filename)
           WHERE filename ~~ '%.parquet';
END
$$
LANGUAGE plpgsql;

SELECT import_parquet_s3_explicit(
    'abc',
    'public',
    'parquet_srv',
    array['one', 'three', 'six'],
    array['int8', 'text', 'bool']::regtype[],
    'list_parquet_files',
    '{"dir": "/path/to/directory"}',
    '{"sorted": "one"}'
);

特性

• 支持在本地文件系统或 Amazon S3 上对 parquet 文件进行 SELECT 操作。

• 支持 INSERT、DELETE、UPDATE(外部修改)。

• 支持 MinIO 访问,以替代 Amazon S3。

• 允许控制外部服务器在事务完成后是否保持连接打开状态。这由 keep_connections 控制,默认为 on。

• 支持 parquet_s3_fdw 的 parquet_s3_fdw_get_connections() 函数,列出打开的外部服务器连接。

无结构模式

• 该功能将使用户能够使用无结构的能力:

 •Jsonb 键:parquet 列名称。

• Jsonb 值:parquet 列数据。

• 每个 parquet 文件没有特定的外部表结构(列定义)。

• 无结构的外表只有一个 jsonb 列,用于根据以下规则表示 parquet 文件中的数据:

使用无结构模式,会有几个好处:

• parquet 文件数据结构的灵活性:通过将所有列数据合并到一个 jsonb 列中,无结构的外表可以查询任何 parquet 文件,文件中的所有列都能映射到 postgres 类型。

• 没有预定义的外部表结构(列定义)。缺少结构意味着外部表会查询 parquet 文件中的所有列,包括用户还未使用的列。

无结构模式用法

• 无结构模式由schemaless选项来启用:

• schemaless选项是true:启用无结构模式。

• schemaless选项是false:禁用无结构模式(我们称之为non-schemaless模式)。

• 如果未配置schemaless选项,则默认值为 false。

• CREATE FOREIGN TABLE、IMPORT FOREIGN SCHEMA、import_parquet_s3()和import_parquet_s3_explicit()中均支持schemaless选项。

• 无结构外表需要至少一个 jsonb 列来表示数据:

CREATE FOREIGN TABLE example_schemaless (
  id int,
  v jsonb
) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
SELECT * FROM example_schemaless;
id |                                                                v
----+---------------------------------------------------------------------------------------------------------------------------------
    | {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
    | {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
(2 rows)

• 如果有 1 个以上的 jsonb 列,则仅填充一列,所有其他列都使用 NULL 值处理。

• 如果没有 jsonb 列,则所有列都使用 NULL 值处理。

• 示例:

• 创建外部表:使用IMPORT FOREIGN SCHEMA,import_parquet_s3()和import_parquet_s3_explicit(),外部表将以固定的列定义进行创建,如下所示:

CREATE FOREIGN TABLE example (
  v jsonb
) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');

• 查询数据:

-- non-schemaless mode
SELECT * FROM example;
 one |    two     | three |        four         |    five    | six | seven
-----+------------+-------+---------------------+------------+-----+-------
   1 | {1,2,3}    | foo   | 2018-01-01 00:00:00 | 2018-01-01 | t   |   0.5
   2 | {NULL,5,6} | bar   | 2018-01-02 00:00:00 | 2018-01-02 | f   |
(2 rows)
-- schemaless mode
SELECT * FROM example_schemaless;
                                                                  v
---------------------------------------------------------------------------------------------------------------------------------
 {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
 {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
(2 rows)

• 在 jsonb 表达式中获取值:

• 使用 ->> jsonb 箭头操作符,返回文本类型。用户可以强制转换 jsonb 表达式的类型,以获得相应的数据表示。

• 例如,获取col值的表达式v->>'col',将是 parquet 文件中的列名col,我们称之为schemaless variable或slvar。

SELECT v->>'two', sqrt((v->>'one')::int) FROM example_schemaless;
  ?column?   |        sqrt
--------------+--------------------
[1, 2, 3]    |                  1
[null, 5, 6] | 1.4142135623730951
(2 rows)

• 某些功能与non-schemaless模式不同

• 在sorted选项中定义列名,与non-schemaless mode相同

• 在ORDER BY子句中使用slvar代替列名。

• 如果排序的 parquet 列不是文本列,请将此列显式地强制转换到映射类型。

• 例如:

CREATE FOREIGN TABLE example_sorted (v jsonb)
SERVER parquet_s3_srv
OPTIONS (filename '/path/to/example1.parquet /path/to/example2.parquet', sorted 'int64_col', schemaless 'true');
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY (v->>'int64_col')::int8;
          QUERY PLAN
--------------------------------
Foreign Scan on example_sorted
  Reader: Multifile Merge
  Row groups:
    example1.parquet: 1, 2
    example2.parquet: 1
(5 rows)

• slvar::type {operator} const。例如:(v->>'int64_col')::int8 = 100

• const {operator} slvar ::type。例如:100 = (v->>'int64_col')::int8

• slvar::boolean is true/false。例如:(v->>'bool_col')::boolean is false

• !(slvar::boolean)。例如:!(v->>'bool_col')::boolean

• Jsonb exist 运算符:((v->>'col')::jsonb) ? element、(v->'col') ? element和v ? 'col'

• 转换函数必须映射 parquet 列类型,否则会跳过过滤器。

• 行组过滤器支持:在无结构模式下,parquet_s3_fdw 可以通过一些如下的WHERE条件,支持对行组进行过滤:

• 要使用 parquet 文件的预排序列,用户必须是:

• 支持对嵌套列表和映射表使用箭头运算符:这些类型将被视为嵌套的 jsonb 值,可以通过->操作符访问。例如:

SELECT * FROM example_schemaless;
                                  v
----------------------------------------------------------------------------
{"array_col": [19, 20], "jsonb_col": {"1": "foo", "2": "bar", "3": "baz"}}
{"array_col": [21, 22], "jsonb_col": {"4": "test1", "5": "test2"}}
(2 rows)

SELECT v->'array_col'->1, v->'jsonb_col'->'1' FROM example3;
?column? | ?column?
----------+----------
20       | "foo"
22       |
(2 rows)

• Postgres 计算(jsonb->>'col')::type的成本,比在non-schemaless模式下直接获取列要大得多,在一些复杂的查询中,schemaless模式的查询计划可能与non-schemaless模式不同。

• 对于其他功能,schemaless模式与non-schemaless模式工作相同。

可写的 FDW

用户可以对已设置键列的外表,执行 insert、update 和 delete 语句。

键列

• 在结构化模式下:可以通过使用 OPTIONS (key 'true') 创建 parquet_s3_fdw 外表对象,来设置键列:

CREATE FOREIGN TABLE userdata (
    id1          int OPTIONS(key 'true'),
    id2          int OPTIONS(key 'true'),
    first_name   text,
    last_name    text
) SERVER parquet_s3_srv
OPTIONS (
    filename 's3://bucket/dir/userdata1.parquet'
);

• 在无结构模式下,可以在创建 parquet_s3_fdw 外部表对象时,使用key_columns选项设置键列:

CREATE FOREIGN TABLE userdata (
    v JSONB
) SERVER parquet_s3_srv
OPTIONS (
    filename 's3://bucket/dir/userdata1.parquet',
    schemaless 'true',
    key_columns 'id1 id2'
);

• key_columns选项可用于 IMPORT FOREIGN SCHEMA 功能:

-- in schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
      v jsonb
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1', schemaless 'true', key_columns 'id1 id2');

-- in non-schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
      id1 INT OPTIONS (key 'true'),
      id2 INT OPTIONS (key 'true'),
      c1  TEXT,
      c2  FLOAT
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1');

insert_file_selector 选项

parquet_s3_fdw 用来在 INSERT 查询中检索目标 parquet 文件的用户定义函数签名:

CREATE FUNCTION insert_file_selector_func(one INT8, dirname text)
RETURNS TEXT AS
$$
    SELECT (dirname || '/example7.parquet')::TEXT;
$$
LANGUAGE SQL;

CREATE FOREIGN TABLE example_func (one INT8 OPTIONS (key 'true'), two TEXT)
SERVER parquet_s3_srv
OPTIONS (
    insert_file_selector 'insert_file_selector_func(one, dirname)',
    dirname '/tmp/data_local/data/test',
    sorted 'one');

• insert_file_selector 函数签名规格:

• dirname arg:dirname 选项的值。

• column args:按名称从插入槽位中获取。

• 语法:[function name]([arg name] , [arg name] ...)

• 默认返回类型为TEXT(parquet 文件的完整路径)

• [arg name]:必须是外部表的列名或dirname

• args 值:

排序列:

parquet_s3_fdw 支持在修改功能中保持排序列的排序状态。

Parquet 文件结构:

基本上,parquet 文件结构是根据一组列名和相应的类型定义的,但在 parquet_s3_fdw 的扫描中,它假定所有具有相同名称的列都具有相同的类型。因此,在修改功能中,也会使用该假设。

从 postgres 类型到 arrow 类型的映射:

  • • 基础类型映射:

SQL 类型

Arrow 类型

BOOL

BOOL

INT2

INT16

INT4

INT32

INT8

INT64

FLOAT4

FLOAT

FLOAT8

DOUBLE

TIMESTAMP/TIMESTAMPTZ

TIMESTAMP

DATE

DATE32

TEXT

STRING

BYTEA

BINARY

• arrow::TIMESTAMP 的默认时间精度为 UTC 时区的微秒级。

• LIST 是由它的元素类型创建的,对于元素只支持基础类型。

• MAP 由其 jsonb 元素的类型来创建的:

jsonb 类型

Arrow 类型

text

STRING

numeric

FLOAT8

boolean

BOOL

null

STRING

其他类型

STRING

• 在无结构模式下:

• 在结构化模式下,基础的 jsonb 类型的映射与 MAP 相同。

• 对于无结构模式下的第一个嵌套的 jsonb:

jsonb 类型

Arrow 类型

array

LIST

object

MAP

• 在结构化模式下,LIST 和 MAP 的元素类型与 MAP 类型相同。

INSERT

-- non-schemaless mode
CREATE FOREIGN TABLE example_insert (
    c1 INT2 OPTIONS (key 'true'),
    c2 TEXT,
    c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet');

INSERT INTO example_insert VALUES (1, 'text1', true), (2, DEFAULT, false), ((select 3), (select i from (values('values are fun!')) as foo (i)), true);
INSERT 0 3

SELECT * FROM example_insert;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | t
  2 |                 | f
  3 | values are fun! | t
(3 rows)

-- schemaless mode
CREATE FOREIGN TABLE example_insert_schemaless (
    v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet', schemaless 'true', key_column 'c1');

INSERT INTO example_insert_schemaless VALUES ('{"c1": 1, "c2": "text1", "c3": true}'), ('{"c1": 2, "c2": null, "c3": false}'), ('{"c1": 3, "c2": "values are fun!", "c3": true}');

SELECT * FROM example_insert_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "t"}
 {"c1": 2, "c2": null, "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)

• 选择要插入的文件:

• 目标文件是第一个其结构与插入记录匹配(插入记录的所有列都存在于目标文件中)的文件。

• 如果没有符合其结构的文件与插入记录的列匹配,并且已指定dirname选项。创建新文件,文件名格式为:[foreign_table_name]_[date_time].parquet

• 否则,会引发错误消息。

• 如果目标文件不存在,则创建与目标文件同名的新文件。

• 如果目标文件存在,但其结构与插入记录的列不匹配,则会引发错误消息。

• 如果存在选项insert_file_selector,目标文件就是该函数的结果。

• 如果选项insert_file_selector不存在:

• 新文件的结构:

• 从现有文件列表中获取。

• 如果在任何文件中都不存在列:根据预定义的映射类型创建基础文件。

• 在结构化模式下,新文件将所有列都存在于外部表中。

• 在无结构模式下,新文件将在 jsonb 值中带上所有列。

• 列信息:

UPDATE/DELETE

-- non-schemaless mode
CREATE FOREIGN TABLE example (
    c1 INT2 OPTIONS (key 'true'),
    c2 TEXT,
    c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet');

SELECT * FROM example;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | t
  2 |                 | f
  3 | values are fun! | t
(3 rows)

UPDATE example SET c3 = false WHERE c2 = 'text1';
UPDATE 1

SELECT * FROM example;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | f
  2 |                 | f
  3 | values are fun! | t
(3 rows)

DELETE FROM example WHERE c1 = 2;
DELETE 1

SELECT * FROM example;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | f
  3 | values are fun! | t
(2 rows)

-- schemaless mode
CREATE FOREIGN TABLE example_schemaless (
    v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet', schemaless 'true', key_columns 'c1');

SELECT * FROM example_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "t"}
 {"c1": 2, "c2": null, "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)

UPDATE example_schemaless SET v='{"c3":false}' WHERE v->>'c2' = 'text1';
UPDATE 1

SELECT * FROM example_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "f"}
 {"c1": 2, "c2": null, "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)

DELETE FROM example_schemaless WHERE (v->>'c1')::int = 2;
DELETE 1

SELECT * FROM example_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(2 rows)

限制

• 不支持事务。

• 无法同时在文件系统和 Amazon S3 上使用 parquet 文件创建单个外部表。

• import_parquet_s3_explicit()函数的第 4 和第 5 个参数,在schemaless模式下没有意义。

WARNING: parquet_s3_fdw: attnames and atttypes are expected to be NULL. They are meaningless for schemaless table.
HINT: Schemaless table imported always contain "v" column with "jsonb" type.

• 这些参数应该定义为NULL值。

• 如果这些参数不是 NULL 值,则会出现下面的WARNING:

• schemaless模式不支持通过CREATE TABLE parent_tbl (v jsonb) PARTITION BY RANGE((v->>'a')::int)创建分区表。

• 在修改功能中:

• 对于大文件,性能不太好。

• 当完全相同的文件同时修改时,结果会出现不一致。

• parquet_s3_fdw修改 parquet 文件的方法是,从目标 parquet 文件创建可修改的缓存数据,并覆盖旧文件:

• 不支持 WITH CHECK OPTION、ON CONFLICT 和 RETURNING。

• sorted列仅支持这些类型:int2、int4、int8、date、timestamp、float4、float8。

• key列仅支持这些类型:int2、int4、int8、date、timestamp、float4、float8和text。

• key列的值必须是唯一的,parquet_s3_fdw不支持检查键列的唯一值,用户必须做好检查。

• key列仅用于 UPDATE/UPDATE。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论