RisingWave 1.10 发布!新增用户自定义聚合函数

2024年 7月 31日 73.7k 0

现在,您可以用 Python 和 JavaScript 创建嵌入式用户自定义聚合函数(User-defined Aggregate Function)。这些函数在 RisingWave 中定义,并使用嵌入式解释器执行。定义后,您可以像使用任何内置 SQL 聚合函数一样使用它们。创建 UDAF 需要使用 CREATE AGGREGATE
 命令,其一般语法如下:

CREATE AGGREGATE function_name ( argument_type [, ...] )
    RETURNS return_type
    LANGUAGE language_name
    AS $$ function_body $$;

其中,函数体 (function_body
) 内是一系列返回聚合值的函数,可以用 Python 或 JavaScript 定义。

对于 UDAF,您必须定义 create_state
函数,它会创建一个新状态 (State),用于维护聚合函数的持续计算,促成高效的计算结果。您还必须定义 accumulate
函数,它更新并返回当前状态值。此函数将状态和来自聚合函数定义的输入参数作为参数。

此外,您可以选择定义 finish
函数,该函数返回聚合函数的结果,定义时必须将状态作为输入参数。如果您未定义此函数,则函数将返回当前状态。您还可以选择 retract
函数,它会撤回当前状态的值,然后返回该值。

UDAF 赋予了 RisingWave 更复杂的计算能力,让您在处理数据时更灵活更自主。

更多细节,请查看:

  • CREATE AGGREGATE[1]「CREATE AGGREGATE
    命令」
  • Embedded Python UDFs[2]「嵌入式 Python UDF」
  • Use UDFs in JavaScript[3]「JavaScript UDF」

2从游标获取多个更新

在 v1.9 中,我们为订阅引入了子脚本和游标,允许您检索对表或物化视图所做的更新。以前,您只能使用 FETCH
命令逐行从游标中检索更新。现在,您可以指定从游标中检索多少行。以下 SQL 查询从游标 cur1
中检索四个最新的更新。

FETCH 4 FROM cur1;

此功能更方便您查看表和物化视图的最近更改。

此外,结果表的列名已更新为与源表或物化视图的列名匹配,此前的格式为 table_name.col_name

更多细节,请查看:

  • Fetch from cursor[4]「从游标获取更新」

3支持可溢出哈希 Join

为提高 RisingWave 在 Join 两个大表时的性能,我们现在支持可溢出哈希 Join。目前,RisingWave 正在使用的是哈希 Join,哈希表在内存中构建,它们能够很好地并行化和扩展,但需要大量内存去构建,当表很大时,可能会导致内存不足问题。可溢出哈希 Join 解决了这个问题,在 Join 查询期间内存使用量高时,RisingWave 可以利用磁盘空间。

4对 CDC Source 连接器的增强

本次版本中,我们继续改进了现有 CDC Source 连接器,为您提供更流畅的流处理体验。此版本包含两个新功能:自动映射 Schema 和元数据列。但请注意,这些新功能并不适用于所有 CDC 连接器,因此请继续阅读以了解更多详情。

自动映射 Schema

在创建 MySQL 或 PostgreSQL CDC 表时,RisingWave 现在会自动将上游表的 Schema 映射到 RisingWave 表。创建表时可以使用 *
 以从源表中导入所有列,而无需单独定义列。但是,如果在表创建过程中指定了其他列,则不能使用 *

让我们用一个简单例子说明这个过程。首先,我们用以下 SQL 查询连接到 MySQL 数据库。在从 MySQL 或 PostgreSQL 导入 CDC 数据时,您必须先创建一个 Source,用于连接到数据库,然后再从各个表中导入数据。

CREATE SOURCE mysql_source WITH (
  connector = 'mysql-cdc',
  hostname = '127.0.0.1',
  port = '3306',
  username = 'root',
  password = 'password',
  database.name = 'mydb',
  server.id = 5888
);

接下来,我们创建一个表,从 MySQL 数据库中上游表 tbl1
中导入所有列。mysql_tbl
的列将对应 tbl1
的列。

CREATE TABLE mysql_tbl (*)
FROM mysql_source TABLE 'mydb.tbl1';

此功能使在 RisingWave 中创建 CDC 表更加高效。

包含元数据列

在创建 MongoDB、MySQL 或 PostgreSQL CDC 表时,可以使用 INCLUDE
子句附加元数据列。如果需要将元数据列添加到已有的 CDC 表中,则需要在 RisingWave 中重新创建该表。

对于 MongoDB、MySQL 和 PostgreSQL CDC 表,您可以使用 INCLUDE
子句导入上游提交时间戳。对于历史数据,默认填充数据为 1970-01-01 00:00:00+00:00

  • 对于 MongoDB,您可以使用 INCLUDE
    子句导入 collection_name

  • 对于 MySQL 和 PostgreSQL,您可以导入 database_name
    schema_name
    table_name

INCLUDE
子句的语法如下:

INCLUDE metadata_col [AS col_name];

metadata_col
可以是上述提到的任何元数据列。在表 Schema 定义之后,此子句可以在创建表时使用。

以下是一个示例,从 MySQL 表中导入元数据列 timestamp
database_name

CREATE TABLE tbl_meta (
 id int,
 name varchar,
 age int
 PRIMARY KEY (id)
) INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME AS db_name
FROM mysql_source TABLE 'mydb.tbl2';

更多细节,请查看:

  • Ingest data from MySQL CDC[5]「从 MySQL CDC 导入数据」
  • Ingest data from PostgreSQL CDC[6]「从 PostgreSQL CDC 导入数据」
  • Ingest data from MongoDB CDC[7]「从 MongoDB CDC 导入数据」

5增强现有 Sink 连接器

默认 Sink 解耦

对于 ClickHouse、Google Pub/Sub、Kafka、Kinesis、MQTT、NATS 和 Pulsar Sink 连接器,Sink 解耦将默认启用。之前,此功能只在 Sink 是 append-only 时才会启用,现在则不再有此限制。Sink 解耦会在 RisingWave 和下游系统之间插入一个缓冲队列,以确保 RisingWave 不受下游系统性能问题的影响。

如果您想禁用 Sink 解耦,请使用会话变量 sink_decouple

SET sink_decouple = false;

检查点解耦选项

对于 Delta Lake 和 StarRocks Sink 连接器,您可以使用 commit_checkpoint_interval
参数,将下游系统的 commit 与 RisingWave 的 commit 解耦。这意味着,RisingWave 将在达到指定的检查点间隔时提交数据,而不是在每个屏障处提交数据。

例如,如果 commit_checkpoint_interval
设置为 5,RisingWave 将间隔 5 个检查点提交一次数据。这可以减少生成的目标表版本,提升查询性能。

在创建 Delta Lake 或 StarRocks Sink 连接器时,commit_checkpoint_interval
参数应在 WITH
选项中指定。

CREATE SINK s1_sink FROM s1_source
WITH (
    connector = 'deltalake',
    type = 'append-only',
    location = 's3a://my-delta-lake-bucket/path/to/table',
    s3.endpoint = '',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY}',
    commit_checkpoint_interval = 5
)

更多细节,请查看:

  • Sink decoupling[8]「Sink 解耦」
  • Sink data from RisingWave to Delta Lake[9]「将数据从 RisingWave 导出到 Delta Lake」
  • Sink data from RisingWave to StarRocks[10]「将数据从 RisingWave 导出到 StarRocks」

6新增 Sink 连接器

RisingWave 一直在持续添加下游系统连接器,扩展其生态系统。我们现在支持将数据 Sink 到 DynamoDB 和 Microsoft SQL Server。如果您对特定连接器感兴趣,请参阅我们的集成页面。您可以投票以表示对特定连接器感兴趣,或在其可用时收到通知。

Amazon DynamoDB

Amazon DynamoDB 是一个 NoSQL 数据库,旨在处理高容量的结构化和半结构化数据。它提供一致的高性能和易扩展性。要将数据从 RisingWave Sink 到 DynamoDB 表,需使用 CREATE SINK
命令。Sink 到 DynamoDB 表时,您的 RisingWave 源表必须有一个由两列组成的复合主键。它们需要对应 DynamoDB 目标表中定义的分区键和排序键。

例如,如果您想 Sink 到名为 books_dynamo
的 DynamoDB 表,该表具有分区键 isbn
和排序键 edition
,则 RisingWave 表 Schema 应定义如下:

CREATE TABLE IF NOT EXISTS books_rw (
 isbn varchar,
 edition int,
 title varchar,
 author varchar,
 primary key (isbn, edition)
);

然后再创建 Sink 连接器,将数据从 books_rw
Sink 到 books_dynamo

CREATE SINK dynamo_sink
FROM movies
WITH (
  connector = 'dynamodb',
  table = 'books_dynamo',
  primary_key = 'isbn, edition',
  endpoint = '',
  region = 'region,
  access_key = 'access_key',
  secret_key = 'secret_key'
);

Microsoft SQL Server

Microsoft SQL Server 是一个强大的关系数据库管理系统,支持广泛的数据事务处理、商业智能等功能。它使用 T-SQL,并包括 SQL Server 集成服务、报告服务和分析服务等工具。RisingWave 支持将数据 Sink 到自托管的 SQL Server 和 Azure SQL。

以下是一个示例,我们创建了 Sink 连接器 sqlserver_sink
,将数据从物化视图 mv1
 Sink 到 SQL Server 表 sqlserver_tbl
。因为这是一个 Upsert Sink ,我们定义了主键 pk1
pk2

 CREATE SINK sqlserver_sink
 FROM mv1
 WITH (
  connector = 'sqlserver',
  type = 'upsert',
  sqlserver.host = 'sqlserver-server',
  sqlserver.port = 1433,
  sqlserver.user = 'user',
  sqlserver.password = 'password',
  sqlserver.database = 'mydb',
  sqlserver.table = 'sqlserver_tbl',
  primary_key = 'pk1, pk2',
);

OpenSearch

OpenSearch 是一个开源的搜索和分析引擎,旨在实时搜索、分析和可视化大量数据。它源自 ElasticSearch,适用于日志和事件数据分析、企业搜索、监控观测等各种应用。

要将数据从 RisingWave Sink 到 OpenSearch,您可以使用 CREATE SINK
命令。

CREATE SINK opensearch_sink
FROM table1
WITH (
    connector = 'opensearch',
    index = 'id1',
    primary_key = 'types_id',
    url = '',
    username = 'user',
    password = 'password'
);

更多细节,请查看:

  • Sink data from RisingWave to OpenSearch[11]「将数据从 RisingWave 导出到 OpenSearch」

7保留内存算法变更

现在,用于计算默认保留内存 (Reserved Memory) 的算法已更改。保留内存用于为 RisingWave 提供调整内存使用量的缓冲时间,以应对输入数据的额外涌入。之前,我们将计算节点总内存的 20% 用作保留内存。现在,保留内存的计算方式为:前 16GB 内存的 30% + 剩余内存的 20%。通过这种计算方法,保留内存可以根据您的设置进行扩展,更好地平衡系统性能和内存利用率。

如果此方法不适合您,您可以使用启动选项 --reserve-memory-bytes
或环境变量 RW_RESERVED_MEMORY_BYTES
指定保留内存,但需要注意,保留内存必须至少为 512MB。

更多细节,请查看:

  • Sink data from RisingWave to Amazon DynamoDB[12]「将数据从 RisingWave 导出到 DynamoDB」
  • Sink data from RisingWave to SQL Server[13]「将数据从 RisingWave 导出到 SQL Server」
  • Sink data from RisingWave to OpenSearch[14]「将数据从 RisingWave 导出到 OpenSearch」

8总结

以上只是 RisingWave 1.10 版本新增的部分功能,如果您想了解本次更新的完整列表,请查看更详细的发布说明[15]。

参考资料[1]

CREATE AGGREGATE
: https://docs.risingwave.com/docs/current/sql-create-aggregate/

[2]

Embedded Python UDFs: https://docs.risingwave.com/docs/current/udf-python-embedded/

[3]

Use UDFs in JavaScript: https://docs.risingwave.com/docs/current/udf-javascript/

[4]

Fetch from cursor: https://docs.risingwave.com/docs/current/subscription/#fetch-from-cursor

[5]

Ingest data from MySQL CDC: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/

[6]

Ingest data from PostgreSQL CDC: https://docs.risingwave.com/docs/current/ingest-from-postgres-cdc/

[7]

Ingest data from MongoDB CDC: https://docs.risingwave.com/docs/current/ingest-from-mongodb-cdc/

[8]

Sink decoupling: https://docs.risingwave.com/docs/dev/data-delivery/#sink-decoupling

[9]

Sink data from RisingWave to Delta Lake: https://docs.risingwave.com/docs/current/sink-to-delta-lake/

[10]

Sink data from RisingWave to StarRocks: https://docs.risingwave.com/docs/current/sink-to-starrocks/

[11]

Sink data from RisingWave to OpenSearch: https://docs.risingwave.com/docs/current/sink-to-opensearch/

[12]

Sink data from RisingWave to Amazon DynamoDB: https://docs.risingwave.com/docs/current/sink-to-dynamodb/

[13]

Sink data from RisingWave to SQL Server: https://docs.risingwave.com/docs/current/sink-to-sqlserver/

[14]

Sink data from RisingWave to OpenSearch: https://docs.risingwave.com/docs/current/sink-to-opensearch/

[15]

发布说明: https://docs.risingwave.com/release-notes/

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论