基于 Flink 的 OceanBase AP 实时分析 demo

2024年 7月 1日 79.8k 0

先看效果

基于 Flink 的 OceanBase AP 实时分析 demo-1

  • 支持 PC 端下单,也支持多人通过手机扫码在线下单,可交互性更强。
  • 订单数据写入 OB TP 数据库,并通过 Flink CDC 实时同步到 OB AP 数据库,并从 AP 库中查询最新数据。
  • 不管是 count(*) 计数还是 where + group by + order by 多条件查询,亿级别的数据查询耗时基本都在 10ms ~ 100ms,充分体现了 OB AP 的查询性能。(注: 该耗时包括了后端到数据库之间的网络延时,因此数据库内部的 SQL 耗时还要更低)
  • 只建了主键索引。

准备数据库

  • TP 数据库: OB 4.3.0 行存 + 开启 Binlog 服务
  • AP 数据库: OB 4.3.0 列存

使用 OBCloud 阿里云版本

  • 为了降低搭建成本,方便后续和 Flink 以及应用进行集成,直接使用 OBCloud 阿里云版本,数据库配置如下:
    • OB 版本 4.3.0.1,目前该版本在 OBCloud 需要开通白名单才能购买,具体可以联系官方技术服务同学进行开通。
    • 我这里购买一个按需付费的集群实例,3 节点,节点规格为 14C70G,价格 ¥32/小时。如果只用于测试,可以申请 🎉 OB Cloud 的 30 天免费试用,最低规格的 1C4G 就可以满足需求。

基于 Flink 的 OceanBase AP 实时分析 demo-2

  • 在集群实例下创建了 oltp 和 olap 两个租户,用作 TP 和 AP 库,配置如下:

基于 Flink 的 OceanBase AP 实时分析 demo-3

  • 在两个租户下分别创建数据库、访问账号,并配置 IP 白名单和公网访问地址,然后就能得到数据库连接串。具体过程不赘述,可以参考我之前的文章 使用阿里云的 OceanBase 云服务。

TP 库开通 Binlog 服务

基于 Flink 的 OceanBase AP 实时分析 demo-1

行存表和列存表

  • 连接 TP 库,创建 tp_car_orders 表(行存):
create table tp_car_orders(
    order_id bigint primary key NOT NULL AUTO_INCREMENT,
    -- order_time 默认值由数据库自动生成
    order_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, 
    customer_name varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    sale_nation varchar(25) COLLATE utf8mb4_bin NOT NULL,
    sale_region varchar(25) COLLATE utf8mb4_bin NOT NULL,                   
    car_color varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    car_price decimal(15,2) NOT NULL
);
  • 连接 AP 库,创建 ap_car_orders 表(列存)。
      • 由于两张表需要通过 Flink 进行同步,因此两张表的结构几乎完全一样。
      • 唯二的区别: ① 一个是行存,一个是列存 ② order_time 的默认值不同。
create table ap_car_orders(
    order_id bigint primary key NOT NULL AUTO_INCREMENT,
    -- order_time 从 tp_car_orders 表同步过来
    order_time timestamp NOT NULL, 
    customer_name varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    sale_nation varchar(25) COLLATE utf8mb4_bin NOT NULL,
    sale_region varchar(25) COLLATE utf8mb4_bin NOT NULL,                   
    car_color varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    car_price decimal(15,2) NOT NULL
) WITH COLUMN GROUP (each column);

准备 Flink

开通阿里云 Flink

  • 本地安装并使用 Flink 进行同步可以参考 OB 官方文档,我这里为了简单,直接使用 阿里云 Flink 托管版本。包年包月比较贵,可以使用按量付费版本,按照指引开通服务即可。

基于 Flink 的 OceanBase AP 实时分析 demo-2

Flink 网络配置

  • 进入 Flink 工作空间,使用前建议使用网络探测功能验证数据库是否可正常连接。

基于 Flink 的 OceanBase AP 实时分析 demo-6

  • 如果 Flink 和数据库在同一个 VPC: 可以通过数据库私网地址进行连接。
  • 如果 Flink 和数据库不在同一个 VPC:
    • 配置 跨 VPC 访问
    • 通过数据库公网地址进行连接,但阿里云 Flink 默认不支持访问公网,需要配置 NAT 网关实现 VPC 网络与公网网络互通,详见 文档。
  • 保证 Flink 和数据库可以互通之后,网络探测的结果应该如下:

基于 Flink 的 OceanBase AP 实时分析 demo-3

Flink 同步配置

  • 进入「配置管理」,修改作业默认配置,将「系统检查点间隔」和「两次系统检查点间最短间隔」两个参数均改为 1 秒,以保证同步的实时性。

基于 Flink 的 OceanBase AP 实时分析 demo-8

Flink SQL 开发

  • 进入「SQL 开发-新建-新建空白的留作业草稿」

基于 Flink 的 OceanBase AP 实时分析 demo-4

  • 输入以下 SQL 内容:
-- 创建 TP CDC 表,表结构和源表 tp_car_orders 一致
CREATE TEMPORARY TABLE tp_car_orders_cdc (
    order_id bigint primary key NOT ENFORCED,
    order_time timestamp NOT NULL, 
    customer_name varchar(25) NOT NULL, 
    sale_nation varchar(25) NOT NULL,
    sale_region varchar(25) NOT NULL,                   
    car_color varchar(25) NOT NULL, 
    car_price decimal(15,2) NOT NULL
) WITH (
    -- OB Binlog 服务兼容 MySQL BinLog 服务,因此可以使用 mysql-cdc 作为连接器
    'connector' = 'mysql-cdc',
    'hostname' = <HOST>,
    'port' = <PORT>,
    'username' = <USER_NAME>,
    'password' = <PASSWORD>,
    'database-name' = 'oltp',
    'table-name' = 'tp_car_orders'
);

-- 创建 AP CDC 表,表结构和目标表 ap_car_orders 一致
CREATE TEMPORARY TABLE ap_car_orders_cdc (
    order_id bigint primary key NOT ENFORCED,
    order_time timestamp NOT NULL, 
    customer_name varchar(25) NOT NULL, 
    sale_nation varchar(25) NOT NULL,
    sale_region varchar(25) NOT NULL,                   
    car_color varchar(25) NOT NULL, 
    car_price decimal(15,2) NOT NULL
) WITH (
    -- 使用 jdbc 连接器
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://<HOST>:<PORT>/olap',
    'username' = <USER_NAME>,
    'password' = <PASSWORD>,
    'table-name' = 'ap_car_orders',
    -- 不缓存记录,直接 flush 数据
    'sink.buffer-flush.max-rows' = '0',
    -- flush 数据的时间间隔设为 0,直接 flush 数据
    'sink.buffer-flush.interval' = '0'
);

-- 将 TP CDC 表同步到 AP CDC 表
INSERT INTO ap_car_orders_cdc SELECT * FROM tp_car_orders_cdc;
  • 📢 注意: 需要调整 ap_car_orders_cdc 以下两个参数,同样的为了保证同步的实时性,我这里均设为 '0',即不做缓存和间隔,直接 flush 数据。这两个参数的用法详见 文档。
    • sink.buffer-flush.max-rows
    • sink.buffer-flush.interval
  • SQL 语法检查和网络连通性校验:

基于 Flink 的 OceanBase AP 实时分析 demo-5

  • 启动 SQL 调试,并往 tp_car_orders 表里插入一条数据:
INSERT INTO `tp_car_orders` (`car_price`,`car_color`,`sale_region`,`sale_nation`,`customer_name`) VALUES (299900,'blue','Washington','America','Lucy');
  • 预期能够捕获到 tp_car_orders 的数据变更,但变更还不会写入 ap_car_orders,需要实际部署到作业才能写入。

基于 Flink 的 OceanBase AP 实时分析 demo-2

Flink SQL 部署

  • 部署 SQL 作业:

基于 Flink 的 OceanBase AP 实时分析 demo-3

  • 在「作业运维」即可查看对应作业:

基于 Flink 的 OceanBase AP 实时分析 demo-13

  • 部署并启动成功后,重新往 tp_car_orders 表里插入一条数据:
INSERT INTO `tp_car_orders` (`car_price`,`car_color`,`sale_region`,`sale_nation`,`customer_name`) VALUES (299900,'blue','Washington','America','Lucy');
  • 可以看到在 ap_car_orders 表中数据已经同步:
mysql> select * from ap_car_orders;
+----------+---------------------+---------------+-------------+-------------+-----------+-----------+
| order_id | order_time          | customer_name | sale_nation | sale_region | car_color | car_price |
+----------+---------------------+---------------+-------------+-------------+-----------+-----------+
|        1 | 2024-05-07 17:10:37 | Lucy          | America     | Washington  | blue      | 299900.00 |
+----------+---------------------+---------------+-------------+-------------+-----------+-----------+
1 row in set (0.02 sec)

应用配置

  • 拉取应用代码: https://github.com/dengfuping/oceanbase-playground
  • pnpm i 安装项目依赖。
  • 新建 .env 文件并配置 TP、AP 库的数据库连接串:
OLTP_DATABASE_URL=""
OLAP_DATABASE_URL=""
  • pnpm run dev 启动应用。

导入数据

  • clone 代码仓库,通过 npm run seed可 批量导入脚本,默认导入 1.5 亿条数据,可修改脚本逻辑按需调整:

基于 Flink 的 OceanBase AP 实时分析 demo-4

  • 导入数据后需要对 AP 库发起一次合并,才能保证最优的查询性能。
mysql> ALTER SYSTEM MAJOR FREEZE;

AP SQL 调优(可选)

  • 针对 SQL 建对应索引,详见 索引简介。
  • 设置执行并发度,详见 设置并行执行并行度。
  • 改为分区表 (按时间分区),详见 分区概述。

运行效果

基于 Flink 的 OceanBase AP 实时分析 demo-1

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论