ClickHouse技术分享

2023年 10月 9日 57.9k 0

大家感兴趣的问题

  • 风神里ClickHouse为什么不让select *?

  • ClickHouse为什么那么快?为什么QPS那么低?它适用哪些场景?

  • ClickHouse是如何完成JOIN的,为什么不建议用JOIN?

  • 简介

    ClickHouse是俄罗斯的 Yandex(搜索巨头)于 2016 年开源的列式存储数据库(DBMS),使用 C++ 语言编写,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。

    在字节的应用

    image.png

    性能

    ClickHouse像很多OLAP数据库一样,单表查询速度优于关联查询,而且ClickHouse的两者差距更为明显。

    image.png

    为什么关联查询这么慢——join的实现

    多表join时要满足小表在右的原则,右表关联时被加在到内存中与左表进行比较,ClickHouse中无论是left join、right join还是inner join永远都是拿着右表中的每一条记录到左表中查找该记录是否存在,所以右表必须是小表。

    另外join操作无法使用缓存,所以即使是两次相同的join语句,ClickHouse也会视为两条新SQL。

    所以为什么join要这么实现?

    ClickHouse最严重的缺陷

    子查询视为临时表,在处理多个join的关联查询时,ClickHouse会把查询拆成递归的子查询,每一次递归只处理一个Join关联,系统没有自动优化能力,Join reorder是优化器的重要课题,但是ClickHouse完全不提供这个能力,对内核不够了解的用户基本无法写出性能最佳的关联查询。

    OLAP场景的关键特征

    • 绝大多数是读请求。
    • 数据以相当大的批次(> 1000行)更新,而不是单行更新;或者根本没有更新。
    • 已添加到数据库的数据不能修改。
    • 对于读取,从数据库中提取相当多的行,但只提取列的一小部分。
    • 宽表,即每个表包含着大量的列。
    • 查询相对较少(通常每台服务器每秒查询数百次或更少)。
    • 对于简单查询,允许延迟大约50毫秒。
    • 列中的数据相对较小:数字和短字符串(例如,每个URL 60个字节)。
    • 处理单个查询时需要高吞吐量(每台服务器每秒可达数十亿行)。
    • 事务不是必须的。
    • 对数据一致性要求低。
    • 每个查询有一个大表。除了它以外,其他的都很小。
    • 查询结果明显小于源数据。换句话说,数据经过过滤或聚合,因此结果适合于单个服务器的内存中。

    很容易可以看出,OLAP场景与其他通常业务场景(例如,OLTP或K/V)有很大的不同, 因此想要使用OLTP或Key-Value数据库去高效的处理分析查询场景,并不是非常完美的适用方案。例如,使用OLAP数据库去处理分析请求通常要优于使用MongoDB或Redis去处理分析请求。

    列存储

    row-oriented.webp

    column-oriented.webp

    • 对于列的聚合、计数、求和等统计操作效率优于行式存储。
    • 由于某一列的数据类型都是相同的,针对数据存储更容易进行数据压缩,每一列更容易选择较优的数据压缩算法,大大提高了数据的压缩率。
    • 由于数据压缩率更高,在节省磁盘空间的同时对于Cache也更加友好。

    数据分区与线程级并行

    ClickHouse 将数据划分为多个 partition,每个 partition 再进一步划分为多个 index granularity(索引粒度),然后通过多个 CPU 核心分别处理其中的一部分来实现并行数据处理。

    分区后,面对涉及跨分区的查询统计,CH 会以分区为单位并行处理。

    在这种设计下,单条 Query 就能利用整机所有 CPU。极致的并行处理能力,极大的降低了查询延时。

    所以,ClickHouse 即使对于大量数据的查询也能够化整为零平行处理。

    但是有一个弊端就是对于单条查询使用多 cpu,就不利于同时并发多条查询。

    所以对于高 qps 的查询业务, ClickHouse 并不是强项 。

    向量化执行查询

    相比较逐行处理模式,向量化执行引擎采用批量处理模式,可以大幅减少函数调用开销,降低指令、数据的Cache Miss率,提升CPU利用效率。

    并且ClickHouse可利用SIMD指令进一步加速执行效率。这部分是ClickHouse优于大量同类OLAP产品的重要因素。

    具体例子如下图:

    动态代码生成

    在经典的数据库实现中,通常对表达式计算采用火山模型,也即将查询转换成一个个operator,比如HashJoin、Scan、IndexScan、Aggregation等。为了连接不同算子,operator之间采用统一的接口,比如open/next/close。在每个算子内部都实现了父类的这些虚函数,在分析场景中单条SQL要处理数据通常高达数亿行,虚函数的调用开销不再可以忽略不计。另外,在每个算子内部都要考虑多种变量,比如列类型、列的size、列的个数等,存在着大量的if-else分支判断导致CPU分支预测失效。

    ClickHouse实现了Expression级别的runtime codegen,动态地根据当前SQL直接生成代码,然后编译执行。如下图例子所示,对于Expression直接生成代码,不仅消除了大量的虚函数调用(即图中多个function pointer的调用),而且由于在运行时表达式的参数类型、个数等都是已知的,也消除了不必要的if-else分支判断。

    MergeTree

    MergeTree是类似LSM Tree的结构,MergeTree系列的引擎被设计用于插入极大量的数据到一张表当中。

    通过类 LSM tree 的结构,ClickHouse 在数据导入时全部是顺序 append 写,写入后数据段不可更改,在后台 compaction 时也是多个段 merge sort (归并排序)后顺序写回磁盘。

    顺序写的特性,充分利用了磁盘的吞吐能力,即便在 HDD (机械硬盘/传统硬盘)上也有着优异的写入性能。

    官方公开 benchmark 测试显示能够达到 50MB-200MB/s 的写入吞吐能力,按照每行 100Byte 估算,大约相当于 50W-200W 条/s 的写入速度。

    MergeTree引擎——索引

    排序键索引

    MergeTree的主键使用PRIMARY KEY定义,待主键定义之后,MergeTree会依据index_granularity间隔(默认8192行),为数据表生成一级索引并保存至primary.idx文件内,索引数据按照PRIMARY KEY排序。相比使用PRIMARY KEY定义,更为常见的简化形式是通过ORDER BY指代主键。在此种情形下,PRIMARY KEY与ORDER BY定义相同,所以索引(primary.idx)和数据(.bin)会按照完全相同的规则排序。

    稀疏索引

    仅需使用少量的索引标记就能够记录大量数据的区间位置信息,且数据量越大优势越为明显。以默认的索引粒度(8192)为例,MergeTree只需要12208行索引标记就能为1亿行数据记录提供索引。由于稀疏索引占用空间小,所以primary.idx内的索引数据常驻内存,取用速度自然极快。

    索引粒度

    数据以index_granularity的粒度(默认8192)被标记成多个小的区间,其中每个区间最多8192行数据。MergeTree使用MarkRange表示一个具体的区间,并通过start和end表示其具体的范围。index_granularity的命名虽然取了索引二字,但它不单只作用于一级索引(.idx),同时也会影响数据标记(.mrk)和数据文件(.bin)。因为仅有一级索引自身是无法完成查询工作的,它需要借助数据标记才能定位数据,所以一级索引和数据标记的间隔粒度相同(同为index_granularity行),彼此对齐。而数据文件也会依照index_granularity的间隔粒度生成压缩数据块。

    此外,还可以通过index_granularity_bytes来控制索引粒度,如果开启了这项配置,实际上索引粒度的的行数的在[1, index_granularity]范围内,主要取决于行的大小。如果单行的大小超过了index_granularity_bytes设置的值,那么索引粒度的大小则为该行。

    在19.11版本之前, 只有index_granularity配置能够用于限制索引粒度的大小。当从具有很大的行(几十上百MB)的表中查询数据时候,index_granularity_bytes配置能够提升ClickHouse的性能。如果表里有很大的行,可以开启这项配置来提升select查询的性能。

    索引数据的生成规则

    由于是稀疏索引,所以MergeTree需要间隔index_granularity行数据才会生成一条索引记录,其索引值会依据声明的主键字段获取。图6-8所示是对照测试表hits_v1中的真实数据具象化后的效果。hits_v1使用年月分区(PARTITION BYtoYYYYMM(EventDate)),所以2014年3月份的数据最终会被划分到同一个分区目录内。如果使用CounterID作为主键(ORDER BY CounterID),则每间隔8192行数据就会取一次CounterID的值作为索引值,索引数据最终会被写入primary.idx文件进行保存。

    例如第0(81920)行CounterID取值57,第8192(81921)行CounterID取值1635,而第16384(8192*2)行CounterID取值3266,最终索引数据将会是5716353266。

    如果使用多个主键,例如ORDER BY (CounterID, EventDate),则每间隔8192行可以同时取CounterID与EventDate两列的值作为索引值,具体如图所示。

    跳数索引

    minmax

    • minmax索引 记录了一段数据内的最小值和最大值
    • 索引的作用类似分区目录的 minmax 索引,能够快速跳过无用的数据区间
    • 常数级别加速
    -- 示例
    
    INDEX a id TYPE minmax GRANULARITY 5
    
    
    
    -- minmax 索引会记录这段数据区间内 id 字段的最小值与最大值,极值计算设计 5 个 index_granularity 区间的数据
    

    set

    • set 索引直接记录了声明字段或表达式的取值(唯一值,不重复)
    • 完整形式为:set(max_rows),其中 max_rows 是一个阈值,表示在一个index_granularity 内,索引最多记录的数据行数
    • 如果 max_rows = 0 ,表示无限制
    • 要求数据具有局部性特征
    -- 示例
    
    INDEX b length(id) * 8 TYPE set(1000) GRANULARITY 5,
    
    
    
    -- set 索引会记录 id 长度 * 8 的值,每个 index_granularity 最多记录 1000 条
    

    ngrambf_v1 & tokenbf_v1 & bloom_filter

    布隆过滤器类型的跳数索引对于长字符串场景有一定加速空间

    • ngrambf_v1索引记录的是数据短语的布隆过滤器

    • ngrambf_v1索引只支持 String 和FixedString 类型

    • ngrambf_v1索引只能提升in、notIn、like、equals和 notEquals 的查询性能

    • 完整形式为:ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)

      • n:token 长度,根据 n 长度,将数据切割为 token 短语
      • size_of_bloom_filter_in_bytes:布隆过滤器的大小
      • number_of_hash_functions:布隆过滤器中使用 hash 函数的个数
      • random_seed:hash 函数的随机种子
    • tokenbf_v1索引也是一种布隆过滤器索引,可以看做是ngrambf_v1索引的变种。
    • tokenbf_v1索引除了短语 token 处理的方法与ngrambf_v1索引不同,其他方面处理都是一样的。
    • tokenbf_v1索引会自动按照非字符的,数字的字符分割 token

    优化了equals、notEquals、in、not in、has的性能

    • bloom_filter([false_positive])布隆过滤器本尊
    • 可选参数false_positive用来指定从布隆过滤器收到错误响应的几率。取值范围是 (0,1),默认值:0.025

    索引-函数 生效表

    WHERE 子句中的条件可以包含对某列数据进行运算的函数表达式,如果列是索引的一部分,ClickHouse会在执行函数时尝试使用索引。不同的函数对索引的支持是不同的。

    set索引会对所有函数生效,其他索引对函数的生效情况见下表

    函数 (操作符) / 索引 primary key minmax ngrambf_v1 tokenbf_v1 bloom_filter
    equals (=, ==)
    notEquals(!=, )
    like
    notLike
    startsWith
    endsWith
    multiSearchAny
    in
    notIn
    less ()
    lessOrEquals (=)
    empty
    notEmpty
    hasToken

    压缩

    .bin 数据文件是由多个数据压缩块组成的,每个数据压缩块的头信息又是由CompressionMethod_CompressedSize_UncompressedSize组成的。

    • 压缩算法(1 个 UInt8 整型表示,1 字节)
    • 压缩后的数据大小(1 个 UInt32 整型表示,4 字节)
    • 压缩前数据大小(1 个 UInt32 整型表示,4 字节)

    .bin 数据文件是由多个数据压缩块组成的,每个数据压缩块的头信息又是由 CompressionMethod_CompressedSize_UncompressedSize组成的。

    每个压缩数据块的体积,按照其压缩前的数据字节大小,被严格控制在 64KB~1MB 之间,分别由 min_compress_block_size (默认 65536) 与min_compress_block_size (默认:1048576)参数指定。

    数据压缩块的大小和间隔(index_granularity)内的数据实际大小有关。

    MergeTree 在写入数据过程中,会依照 index_granularity 粒度,按批次取数据并进行处理。设下一批未压缩数据大小为 x 字节,写入数据遵循如下规则:

  • x < 64KB:当单批次数据小于 64KB 时,继续获取下一批数据,直到累积到 64KB后,生成下一个数据压缩块。
  • 64KB < x < 1MB:当单批次数据大于64KB 且小于 1M 时,直接生成下一个数据压缩块。
  • x > 1MB:当单批次数据大于 1MB 时,将数据按照 1MB 截取,生成数据压缩块,其余的数据按照前面的规则执行,此时可能生成多个压缩数据块。
  • 压缩块是读取数据的最小单元。

    集群相关

    ClickHouse & ZooKeeper

    ClickHouse依赖ZooKeeper解决的问题

    • 分布式DDL执行。ClickHouse中DDL执行默认不是分布式化的,用户需要在DDL语句中加上on Cluster XXX的声明才能触发这个功能。
    • ReplicatedMergeTree表主备节点之间的状态同步。

    ClickHouse对库、表的管理都是在存储节点级别独立的,集群中各节点之间的库、表元数据信息没有一致性约束

    理由是:由 ClickHouse 的架构特色决定的

    • 彻底Share Nothing,各节点之间完全没有相互依赖
    • 节点完全对等,集群中的节点角色统一,ClickHouse没有传统MPP数据库中的前端节点、Worker节点、元数据节点等概念
    • ClickHouse的这种架构特色决定它可以敏捷化、小规模部署,集群可以任意进行分裂、合并。前提要求是感知数据在集群节点上的分布

    分布式表

    分布式表本身不存储数据,但可以在多个服务器上进行分布式查询。

    可以理解为一个视图。读是自动并行的。

    本地表是在节点创建的真正存数据的表,此外我们还需要创建分布式表。

    -- 创建本地表
    
    create table st_order_mt on cluster gmall_cluster
    
    
    
    (
    
        id UInt32,
    
        sku_id String,
    
        total_amount Decimal(16, 2), 
    
        create_time Datetime
    
    ) engine = ReplicatedMergeTree('/clickhouse/tables/{cluster}-{shard}/st_order_mt', '{replica}');
    
    
    
    -- 创建分布式表
    
    create table st_order_mt_all2 on cluster gmall_cluster
    
    
    
    (
    
    
    
         id UInt32, 
    
         sku_id String,
    
         total_amount Decimal(16,2), 
    
         create_time Datetime 
    
    
    
     ) engine = Distributed(gmall_cluster,default, st_order_mt,hiveHash(sku_id));
    
    
    
    -- Distributed(集群名称,库名,本地表名,分片键) 
    
    
    
    -- 分片键必须是整型数字,所以用 hiveHash 函数转换,也可以 rand() 
    

    其中集群标识符{cluster}、分片标识符{shard}和副本标识符{replica}来自复制表宏配置,即config.xml中一节的内容,配合ON CLUSTER语法一同使用,可以避免建表时在每个实例上反复修改这些值。

    为了最大化性能与稳定性,分片和副本几乎总是一同使用

    ClickHouse的副本机制之所以叫“复制表”,是因为它工作在表级别,而不是集群级别(如HDFS)。也就是说,用户在创建表时可以通过指定引擎选择该表是否高可用,每张表的分片与副本都是互相独立的。

    ReplicatedMergeTree引擎族在ZK中存储大量数据,包括且不限于表结构信息、元数据、操作日志、副本状态、数据块校验值、数据part merge过程中的选主信息等等。可见,ZK在复制表机制下扮演了元数据存储、日志框架、分布式协调服务三重角色,任务很重,所以需要额外保证ZK集群的可用性以及资源(尤其是硬盘资源)。

    写入

    以3分片、2副本为例(共6个节点)

    写入到分布式表中时,数据是先写到一个分布式表的实例中并缓存起来,再逐渐分发到各个分片上去。

    直接写分布式表的缺点

    • 由于需要先缓存,实际上是双写了数据(写入放大),浪费资源;
    • 数据写入默认是异步的,短时间内可能造成不一致;
    • 目标表中会产生较多的小parts,使merge(即compaction)过程压力增大。

    改进——写本地表,应用层路由

    相对而言,直接写本地表是同步操作,更快,parts的大小也比较合适,但是就要求应用层额外实现sharding和路由逻辑。而应用层路由并不是什么难事,所以如果条件允许,在生产环境中总是推荐写本地表、读分布式表。

    查询

    副本的选择策略

    由user.xml中的参数load_balancing控制

    • random 默认值,选择errors_count最小的replica,如果多个replica的errors_count相同,则随机选一个
    • nearest_hostname 选择errors_count最小的replica,如果多个replica的errors_count相同,则采用逐一比较的方法,取与client的hostname不同字节最少的一个replica
    • in_order 选择errors_count最小的replica,如果多个replica的errors_count相同,则根据metrika.xml定义的replica顺序选择
    • first_or_random 选择errors_count最小的replica,如果多个replica的errors_count相同,则根据metrika.xml定义的replica顺序选择第一个,如果第一个不可用,则随机选择一个

    JOIN的实现和应用

    ClickHouse JOIN查询语法如下

    SELECT 
    
    FROM 
    
    [GLOBAL] [INNER|LEFT|RIGHT|FULL|CROSS] [OUTER|SEMI|ANTI|ANY|ASOF] JOIN 
    
    (ON )|(USING ) ...
    

    单机JOIN的实现

    ClickHouse 单机JOIN操作默认采用HASH JOIN算法,可选MERGE JOIN算法。其中,MERGE JOIN算法数据会溢出到磁盘,性能相比前者较差。

    ClickHouse的HASH JOIN算法实现比较简单:

    • 从right_table 读取该表全量数据,在内存中构建HASH MAP
    • 从left_table 分批读取数据,根据JOIN KEY到HASH MAP中进行查找,如果命中,则该数据作为JOIN的输出

    从这个实现中可以看出,如果right_table的数据量超过单机可用内存空间的限制,则JOIN操作无法完成。通常,两表JOIN时,将较小表作为right_table。

    分布式JOIN的实现

    ClickHouse 是去中心化架构,非常容易水平扩展集群。当以集群模式提供服务时候,分布式JOIN查询就无法避免。这里的分布式JOIN通常指,JOIN查询中涉及到的left_table 与 right_table 是分布式表。

    通常,分布式JOIN实现机制无非如下几种:

    • Broadcast JOIN
    • Shuffle Join
    • Colocate JOIN

    ClickHouse集群并未实现完整意义上的Shuffle JOIN,实现了类Broadcast JOIN,通过事先完成数据重分布,能够实现Colocate JOIN。

    普通JOIN实现

    无GLOBAL关键字的JOIN的实现:

    • initiator 将SQL S中左表分布式表替换为对应的本地表,形成S'
    • initiator 将a.中的S'分发到集群每个节点
    • 集群节点执行S',并将结果汇总到initiator 节点
    • initiator 节点将结果返回给客户端
    SELECT a_.i, a_.s, b_.t FROM a_all as a_ JOIN b_all AS b_ ON a_.i = b_.i
    

    的执行过程如下

    如果右表为分布式表,则集群中每个节点会去执行分布式查询。这里就会存在一个非常严重的读放大现象。假设集群有N个节点,右表查询会在集群中执行N*N次。

    可以看出,ClickHouse 普通分布式JOIN查询是一个简单版的Shuffle JOIN的实现,或者说是一个不完整的实现。不完整的地方在于,并未按JOIN KEY去Shuffle数据,而是每个节点全量拉去右表数据。这里实际上是存在着优化空间的。

    在生产环境中,查询放大对查询性能的影响是不可忽略的。

    GLOBAL JOIN实现

    GLOBAL JOIN实现如下:

    • 若右表为子查询,则initiator完成子查询计算
    • initiator 将右表数据发送给集群其他节点
    • 集群节点将左表本地表与右表数据进行JOIN计算
    • 集群其他节点将结果发回给initiator节点
    • initiator 将结果汇总,发给客户端

    GLOBAL JOIN 可以看作一个不完整的Broadcast JOIN实现。如果JOIN的右表数据量较大,就会占用大量网络带宽,导致查询性能降低。

    SELECT a_.i, a_.s, b_.t FROM a_all as a_ GLOBAL JOIN b_all AS b_ ON a_.i = b_.i
    

    的执行过程如下

    可以看出,GLOBAL JOIN 将右表的查询在initiator节点上完成后,通过网络发送到其他节点,避免其他节点重复计算,从而避免查询放大。

    分布式JOIN最佳实践

  • 尽量减少JOIN右表的数据量

  • ClickHouse根据JOIN的右表数据,构建HASH MAP,并将SQL中所需的列全部读入内存中。如果右表数据量过大,节点内存无法容纳后,无法完成计算。

    在实际中,我们通常将较小的表作为右表,并尽可能增加过滤条件,降低进入JOIN计算的数据量。

  • 利用GLOBAL JOIN 避免查询放大带来性能损失

  • 如果右表或者子查询的数据量可控,可以使用GLOBAL JOIN来避免读放大。需要注意的是,GLOBAL JOIN 会触发数据在节点之间传播,占用部分网络流量。如果数据量较大,同样会带来性能损失。

  • 数据预分布(应用层保证)实现Colocate JOIN

  • 当JOIN涉及的表数据量都非常大时,读放大,或网络广播都带来巨大性能损失时,我们就需要采取另外一种方式来完成JOIN计算了。

    根据“相同JOIN KEY必定相同分片”原理,我们将涉及JOIN计算的表,按JOIN KEY在集群维度作分片。将分布式JOIN转为为节点的本地JOIN,极大减少了查询放大问题。

    SELECT a_.i, a_.s, b_.t FROM a_all as a_ JOIN b_local AS b_ ON a_.i = b_.i
    

    的执行过程如下

    由于数据以及预分区了,相同的JOIN KEY对应的数据一定在一起,不会跨节点存在,所以无需对右表做分布式查询,也能获得正确结果。

    参考资料

    什么是ClickHouse? | ClickHouse文档

    ClickHouse 架构概述 | ClickHouse文档

    为什么ClickHouse这么快? - 墨天轮

    ClickHouse复制表、分布式表机制与使用方法

    Improve Query Performance with Clickhouse Data Skipping Index - Instana

    云数据库ClickHouse二级索引-最佳实践-阿里云开发者社区

    揭秘“风神”等数据产品背后的分析引擎:ClickHouse(2019)

    大数据东风下,Clickhouse这坨* 是怎么上天的

    云数据库ClickHouse资源隔离-弹性资源队列-阿里云开发者社区

    ClickHouse使用姿势系列之分布式JOIN

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论