此版本的 parquet_s3_fdw 适用于 PostgreSQL 13、14、15 和 16。
只读模式下的 Apache Parquet 外部数据包装器,支持 PostgreSQL 访问 S3 存储。
用法
加载扩展
CREATE EXTENSION parquet_s3_fdw;
创建服务器
CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw;
如果要使用 MinIO 而不是 AWS S3,请使用 use_minio 选项创建服务器。
CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw OPTIONS (use_minio 'true');
创建用户映射
如果要访问 Amazon S3,则必须指定用户名和密码。
CREATE USER MAPPING FOR public SERVER parquet_s3_srv OPTIONS (user 's3user', password 's3password');
创建外表
现在你应该能够从 Parquet 文件创建外表了。目前parquet_s3_fdw支持下面这些数据列类型(还将会增加):
Arrow 类型 |
SQL 类型 |
INT8 |
INT2 |
INT16 |
INT2 |
INT32 |
INT4 |
INT64 |
INT8 |
FLOAT |
FLOAT4 |
DOUBLE |
FLOAT8 |
TIMESTAMP |
TIMESTAMP |
DATE32 |
DATE |
STRING |
TEXT |
BINARY |
BYTEA |
LIST |
ARRAY |
MAP |
JSONB |
目前parquet_s3_fdw不支持结构体和嵌套列表。
支持以下选项:
• filename - 要读取的 Parquet 文件的路径列表,以空格分隔。您可以用s3://开始来指定 AWS S3 上的路径。不支持混合使用本地路径和 S3 路径;
• dirname - 具有要读取的 Parquet 文件的目录路径;
• sorted - 用来预排序 Parquet 文件的,空格分隔的列列表;当使用ORDER BY子句运行查询,或在其他情况下带有预排序的列集会有用(Group Aggregate,Merge Join)时,这将有助于 postgres 避免冗余的排序;
• files_in_order - 要求以filename指定或由files_func返回的文件,根据sorted选项进行排序,并且在范围上没有交叉;这允许在并行多文件扫描节点上使用Gather Merge节点(默认值为false);
• use_mmap - 是否使用内存映射操作,而不是文件读取操作(默认值为false);
• use_threads - 启用 Apache Arrow 的并行列解码/解压(默认值为false);
• files_func - 用户定义的函数,由 parquet_s3_fdw 在每次查询时用于检索 parquet 文件列表;函数必须接受一个JSONB参数,并返回 parquet 文件完整路径的文本数组;
• files_func_arg - 由 files_func 指定的函数的参数。
• max_open_files - 同时打开的 Parquet 文件的数量限制。
• region - 用于连接到的 AWS 区域的值(默认值为ap-northeast-1)。
• endpoint - 用于连接的地址和端口(默认值为127.0.0.1:9000)。
可以为单个和一组 Parquet 文件创建外部表。也可以指定一个用户定义的函数,该函数会返回一个文件路径列表。根据文件数量和表选项,parquet_s3_fdw可以使用以下的一种执行策略:
策略 |
描述 |
Single File |
基本的单文件读取器 |
Multifile |
按顺序逐个处理 Parquet 文件的读取器 |
Multifile Merge |
该读取器会合并预排序的 Parquet 文件,以便生成的结果也是有序的;在指定了 |
Caching Multifile Merge |
与 |
GUC 变量:
• parquet_fdw.use_threads - 允许用户启用或禁用线程的全局开关(默认值为true);
• parquet_fdw.enable_multifile - 启用多文件读取器(默认值为true)。
• parquet_fdw.enable_multifile_merge - 启用多文件合并读取器(默认值为true)。
示例:
CREATE FOREIGN TABLE userdata (
id int,
first_name text,
last_name text
)
SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet'
);
访问外表
SELECT * FROM userdata;
并行查询
parquet_s3_fdw还支持并行查询执行(注意不要与 Apache Arrow 的多线程解码功能混淆)。
导入
parquet_s3_fdw还支持 IMPORT FOREIGN SCHEMA 命令,来发现文件系统上指定目录中的 parquet 文件,并根据这些文件创建外部表。它可以像这样使用:
IMPORT FOREIGN SCHEMA "/path/to/directory"
FROM SERVER parquet_s3_srv
INTO public;
重要的是,这里的remote_schema是一个本地文件系统的目录路径,并且用双引号引起来。
将 parquet 文件导入到外部表的另一种方法是,使用import_parquet_s3或import_parquet_s3_explicit:
CREATE FUNCTION import_parquet_s3(
tablename text,
schemaname text,
servername text,
userfunc regproc,
args jsonb,
options jsonb)
CREATE FUNCTION import_parquet_s3_explicit(
tablename text,
schemaname text,
servername text,
attnames text[],
atttypes regtype[],
userfunc regproc,
args jsonb,
options jsonb)
import_parquet_s3和import_parquet_s3_explicit之间的唯一区别是,后者允许指定一组要导入的属性/列。attnames和atttypes分别是属性名称和属性类型的数组(参见下面的示例)。
userfunc是一个用户自定义函数。它必须接受一个jsonb参数,并返回一个要导入的 parquet 文件的文件系统路径的文本数组。args是用户指定的 jsonb 对象,以作为参数传递给userfunc。这种函数的简单实现和用法,可以如下面这样:
CREATE FUNCTION list_parquet_s3_files(args jsonb)
RETURNS text[] AS
$$
BEGIN
RETURN array_agg(args->>'dir' || '/' || filename)
FROM pg_ls_dir(args->>'dir') AS files(filename)
WHERE filename ~~ '%.parquet';
END
$$
LANGUAGE plpgsql;
SELECT import_parquet_s3_explicit(
'abc',
'public',
'parquet_srv',
array['one', 'three', 'six'],
array['int8', 'text', 'bool']::regtype[],
'list_parquet_files',
'{"dir": "/path/to/directory"}',
'{"sorted": "one"}'
);
特性
• 支持在本地文件系统或 Amazon S3 上对 parquet 文件进行 SELECT 操作。
• 支持 INSERT、DELETE、UPDATE(外部修改)。
• 支持 MinIO 访问,以替代 Amazon S3。
• 允许控制外部服务器在事务完成后是否保持连接打开状态。这由 keep_connections 控制,默认为 on。
• 支持 parquet_s3_fdw 的 parquet_s3_fdw_get_connections() 函数,列出打开的外部服务器连接。
无结构模式
• 该功能将使用户能够使用无结构的能力:
•Jsonb 键:parquet 列名称。
• Jsonb 值:parquet 列数据。
• 每个 parquet 文件没有特定的外部表结构(列定义)。
• 无结构的外表只有一个 jsonb 列,用于根据以下规则表示 parquet 文件中的数据:
使用无结构模式,会有几个好处:
• parquet 文件数据结构的灵活性:通过将所有列数据合并到一个 jsonb 列中,无结构的外表可以查询任何 parquet 文件,文件中的所有列都能映射到 postgres 类型。
• 没有预定义的外部表结构(列定义)。缺少结构意味着外部表会查询 parquet 文件中的所有列,包括用户还未使用的列。
无结构模式用法
• 无结构模式由schemaless选项来启用:
• schemaless选项是true:启用无结构模式。
• schemaless选项是false:禁用无结构模式(我们称之为non-schemaless模式)。
• 如果未配置schemaless选项,则默认值为 false。
• CREATE FOREIGN TABLE、IMPORT FOREIGN SCHEMA、import_parquet_s3()和import_parquet_s3_explicit()中均支持schemaless选项。
• 无结构外表需要至少一个 jsonb 列来表示数据:
CREATE FOREIGN TABLE example_schemaless (
id int,
v jsonb
) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
SELECT * FROM example_schemaless;
id | v
----+---------------------------------------------------------------------------------------------------------------------------------
| {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
| {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
(2 rows)
• 如果有 1 个以上的 jsonb 列,则仅填充一列,所有其他列都使用 NULL 值处理。
• 如果没有 jsonb 列,则所有列都使用 NULL 值处理。
• 示例:
• 创建外部表:使用IMPORT FOREIGN SCHEMA,import_parquet_s3()和import_parquet_s3_explicit(),外部表将以固定的列定义进行创建,如下所示:
CREATE FOREIGN TABLE example (
v jsonb
) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
• 查询数据:
-- non-schemaless mode
SELECT * FROM example;
one | two | three | four | five | six | seven
-----+------------+-------+---------------------+------------+-----+-------
1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5
2 | {NULL,5,6} | bar | 2018-01-02 00:00:00 | 2018-01-02 | f |
(2 rows)
-- schemaless mode
SELECT * FROM example_schemaless;
v
---------------------------------------------------------------------------------------------------------------------------------
{"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
{"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
(2 rows)
• 在 jsonb 表达式中获取值:
• 使用 ->> jsonb 箭头操作符,返回文本类型。用户可以强制转换 jsonb 表达式的类型,以获得相应的数据表示。
• 例如,获取col值的表达式v->>'col',将是 parquet 文件中的列名col,我们称之为schemaless variable或slvar。
SELECT v->>'two', sqrt((v->>'one')::int) FROM example_schemaless;
?column? | sqrt
--------------+--------------------
[1, 2, 3] | 1
[null, 5, 6] | 1.4142135623730951
(2 rows)
• 某些功能与non-schemaless模式不同
• 在sorted选项中定义列名,与non-schemaless mode相同
• 在ORDER BY子句中使用slvar代替列名。
• 如果排序的 parquet 列不是文本列,请将此列显式地强制转换到映射类型。
• 例如:
CREATE FOREIGN TABLE example_sorted (v jsonb)
SERVER parquet_s3_srv
OPTIONS (filename '/path/to/example1.parquet /path/to/example2.parquet', sorted 'int64_col', schemaless 'true');
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY (v->>'int64_col')::int8;
QUERY PLAN
--------------------------------
Foreign Scan on example_sorted
Reader: Multifile Merge
Row groups:
example1.parquet: 1, 2
example2.parquet: 1
(5 rows)
• slvar::type {operator} const。例如:(v->>'int64_col')::int8 = 100
• const {operator} slvar ::type。例如:100 = (v->>'int64_col')::int8
• slvar::boolean is true/false。例如:(v->>'bool_col')::boolean is false
• !(slvar::boolean)。例如:!(v->>'bool_col')::boolean
• Jsonb exist 运算符:((v->>'col')::jsonb) ? element、(v->'col') ? element和v ? 'col'
• 转换函数必须映射 parquet 列类型,否则会跳过过滤器。
• 行组过滤器支持:在无结构模式下,parquet_s3_fdw 可以通过一些如下的WHERE条件,支持对行组进行过滤:
• 要使用 parquet 文件的预排序列,用户必须是:
• 支持对嵌套列表和映射表使用箭头运算符:这些类型将被视为嵌套的 jsonb 值,可以通过->操作符访问。例如:
SELECT * FROM example_schemaless;
v
----------------------------------------------------------------------------
{"array_col": [19, 20], "jsonb_col": {"1": "foo", "2": "bar", "3": "baz"}}
{"array_col": [21, 22], "jsonb_col": {"4": "test1", "5": "test2"}}
(2 rows)
SELECT v->'array_col'->1, v->'jsonb_col'->'1' FROM example3;
?column? | ?column?
----------+----------
20 | "foo"
22 |
(2 rows)
• Postgres 计算(jsonb->>'col')::type的成本,比在non-schemaless模式下直接获取列要大得多,在一些复杂的查询中,schemaless模式的查询计划可能与non-schemaless模式不同。
• 对于其他功能,schemaless模式与non-schemaless模式工作相同。
可写的 FDW
用户可以对已设置键列的外表,执行 insert、update 和 delete 语句。
键列
• 在结构化模式下:可以通过使用 OPTIONS (key 'true') 创建 parquet_s3_fdw 外表对象,来设置键列:
CREATE FOREIGN TABLE userdata (
id1 int OPTIONS(key 'true'),
id2 int OPTIONS(key 'true'),
first_name text,
last_name text
) SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet'
);
• 在无结构模式下,可以在创建 parquet_s3_fdw 外部表对象时,使用key_columns选项设置键列:
CREATE FOREIGN TABLE userdata (
v JSONB
) SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet',
schemaless 'true',
key_columns 'id1 id2'
);
• key_columns选项可用于 IMPORT FOREIGN SCHEMA 功能:
-- in schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
v jsonb
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- in non-schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
id1 INT OPTIONS (key 'true'),
id2 INT OPTIONS (key 'true'),
c1 TEXT,
c2 FLOAT
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1');
insert_file_selector 选项
parquet_s3_fdw 用来在 INSERT 查询中检索目标 parquet 文件的用户定义函数签名:
CREATE FUNCTION insert_file_selector_func(one INT8, dirname text)
RETURNS TEXT AS
$$
SELECT (dirname || '/example7.parquet')::TEXT;
$$
LANGUAGE SQL;
CREATE FOREIGN TABLE example_func (one INT8 OPTIONS (key 'true'), two TEXT)
SERVER parquet_s3_srv
OPTIONS (
insert_file_selector 'insert_file_selector_func(one, dirname)',
dirname '/tmp/data_local/data/test',
sorted 'one');
• insert_file_selector 函数签名规格:
• dirname arg:dirname 选项的值。
• column args:按名称从插入槽位中获取。
• 语法:[function name]([arg name] , [arg name] ...)
• 默认返回类型为TEXT(parquet 文件的完整路径)
• [arg name]:必须是外部表的列名或dirname
• args 值:
排序列:
parquet_s3_fdw 支持在修改功能中保持排序列的排序状态。
Parquet 文件结构:
基本上,parquet 文件结构是根据一组列名和相应的类型定义的,但在 parquet_s3_fdw 的扫描中,它假定所有具有相同名称的列都具有相同的类型。因此,在修改功能中,也会使用该假设。
从 postgres 类型到 arrow 类型的映射:
- • 基础类型映射:
SQL 类型 |
Arrow 类型 |
BOOL |
BOOL |
INT2 |
INT16 |
INT4 |
INT32 |
INT8 |
INT64 |
FLOAT4 |
FLOAT |
FLOAT8 |
DOUBLE |
TIMESTAMP/TIMESTAMPTZ |
TIMESTAMP |
DATE |
DATE32 |
TEXT |
STRING |
BYTEA |
BINARY |
• arrow::TIMESTAMP 的默认时间精度为 UTC 时区的微秒级。
• LIST 是由它的元素类型创建的,对于元素只支持基础类型。
• MAP 由其 jsonb 元素的类型来创建的:
jsonb 类型 |
Arrow 类型 |
text |
STRING |
numeric |
FLOAT8 |
boolean |
BOOL |
null |
STRING |
其他类型 |
STRING |
• 在无结构模式下:
• 在结构化模式下,基础的 jsonb 类型的映射与 MAP 相同。
• 对于无结构模式下的第一个嵌套的 jsonb:
jsonb 类型 |
Arrow 类型 |
array |
LIST |
object |
MAP |
• 在结构化模式下,LIST 和 MAP 的元素类型与 MAP 类型相同。
INSERT
-- non-schemaless mode
CREATE FOREIGN TABLE example_insert (
c1 INT2 OPTIONS (key 'true'),
c2 TEXT,
c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet');
INSERT INTO example_insert VALUES (1, 'text1', true), (2, DEFAULT, false), ((select 3), (select i from (values('values are fun!')) as foo (i)), true);
INSERT 0 3
SELECT * FROM example_insert;
c1 | c2 | c3
----+-----------------+----
1 | text1 | t
2 | | f
3 | values are fun! | t
(3 rows)
-- schemaless mode
CREATE FOREIGN TABLE example_insert_schemaless (
v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet', schemaless 'true', key_column 'c1');
INSERT INTO example_insert_schemaless VALUES ('{"c1": 1, "c2": "text1", "c3": true}'), ('{"c1": 2, "c2": null, "c3": false}'), ('{"c1": 3, "c2": "values are fun!", "c3": true}');
SELECT * FROM example_insert_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "t"}
{"c1": 2, "c2": null, "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
• 选择要插入的文件:
• 目标文件是第一个其结构与插入记录匹配(插入记录的所有列都存在于目标文件中)的文件。
• 如果没有符合其结构的文件与插入记录的列匹配,并且已指定dirname选项。创建新文件,文件名格式为:[foreign_table_name]_[date_time].parquet
• 否则,会引发错误消息。
• 如果目标文件不存在,则创建与目标文件同名的新文件。
• 如果目标文件存在,但其结构与插入记录的列不匹配,则会引发错误消息。
• 如果存在选项insert_file_selector,目标文件就是该函数的结果。
• 如果选项insert_file_selector不存在:
• 新文件的结构:
• 从现有文件列表中获取。
• 如果在任何文件中都不存在列:根据预定义的映射类型创建基础文件。
• 在结构化模式下,新文件将所有列都存在于外部表中。
• 在无结构模式下,新文件将在 jsonb 值中带上所有列。
• 列信息:
UPDATE/DELETE
-- non-schemaless mode
CREATE FOREIGN TABLE example (
c1 INT2 OPTIONS (key 'true'),
c2 TEXT,
c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet');
SELECT * FROM example;
c1 | c2 | c3
----+-----------------+----
1 | text1 | t
2 | | f
3 | values are fun! | t
(3 rows)
UPDATE example SET c3 = false WHERE c2 = 'text1';
UPDATE 1
SELECT * FROM example;
c1 | c2 | c3
----+-----------------+----
1 | text1 | f
2 | | f
3 | values are fun! | t
(3 rows)
DELETE FROM example WHERE c1 = 2;
DELETE 1
SELECT * FROM example;
c1 | c2 | c3
----+-----------------+----
1 | text1 | f
3 | values are fun! | t
(2 rows)
-- schemaless mode
CREATE FOREIGN TABLE example_schemaless (
v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet', schemaless 'true', key_columns 'c1');
SELECT * FROM example_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "t"}
{"c1": 2, "c2": null, "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
UPDATE example_schemaless SET v='{"c3":false}' WHERE v->>'c2' = 'text1';
UPDATE 1
SELECT * FROM example_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "f"}
{"c1": 2, "c2": null, "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
DELETE FROM example_schemaless WHERE (v->>'c1')::int = 2;
DELETE 1
SELECT * FROM example_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(2 rows)
限制
• 不支持事务。
• 无法同时在文件系统和 Amazon S3 上使用 parquet 文件创建单个外部表。
• import_parquet_s3_explicit()函数的第 4 和第 5 个参数,在schemaless模式下没有意义。
WARNING: parquet_s3_fdw: attnames and atttypes are expected to be NULL. They are meaningless for schemaless table.
HINT: Schemaless table imported always contain "v" column with "jsonb" type.
• 这些参数应该定义为NULL值。
• 如果这些参数不是 NULL 值,则会出现下面的WARNING:
• schemaless模式不支持通过CREATE TABLE parent_tbl (v jsonb) PARTITION BY RANGE((v->>'a')::int)创建分区表。
• 在修改功能中:
• 对于大文件,性能不太好。
• 当完全相同的文件同时修改时,结果会出现不一致。
• parquet_s3_fdw修改 parquet 文件的方法是,从目标 parquet 文件创建可修改的缓存数据,并覆盖旧文件:
• 不支持 WITH CHECK OPTION、ON CONFLICT 和 RETURNING。
• sorted列仅支持这些类型:int2、int4、int8、date、timestamp、float4、float8。
• key列仅支持这些类型:int2、int4、int8、date、timestamp、float4、float8和text。
• key列的值必须是唯一的,parquet_s3_fdw不支持检查键列的唯一值,用户必须做好检查。
• key列仅用于 UPDATE/UPDATE。