OceanBase 存储层代码解读(二)微块存储格式

2024年 5月 7日 51.8k 0

OceanBase 存储层代码解读(二)微块存储格式-1

作者:公祺,一个专注于 OBKV 的程序员;海芊,一个致力于当网红的 OceanBase 文档工程师。个人频道:Amber loves OB

1. 微块和宏块的关系

OceanBase 数据库的存储引擎采用了基于 LSM-Tree 的架构,把基线数据和增量数据分别保存在磁盘(SSTable)和内存(MemTable)中,其中 SSTable 以宏块(Macro Block)为单位组织数据,每个宏块大小为 2MB。宏块内部又划分出很多个大小为 16K(压缩前的大小)微块(Micro Block),每个微块内则包含多个行(Row)。

OceanBase 数据库内部 IO 的最小单元就是微块,微块数据可以进行压缩,下图是微块在宏块中的存储格式:

OceanBase 存储层代码解读(二)微块存储格式-2

其中:

1.macro block header:宏块的元数据,包括:版本、宏块大小、微块的个数、列的类型、行的个数、压缩算法等信息。

2.micro block n:微块的具体数据,具体信息见下文微块的存储格式。

3.micro block index:微块的索引数据,用来索引每个微块,通常常驻内存。

每一个微块包含多个行数据,行数据中有多个列的数据,如下是当前微块的存储格式:

OceanBase 存储层代码解读(二)微块存储格式-3

其中:

1.record header:包含微块数据大小、头部大小、version、checksum 等。

2.micro block header:包含微块的元数据,包括:列数、行索引的偏移量、行数等。

3.Row n:包含行数据,包括:row header、列数据、列索引。

(1)row header:行的元数据,包括:列索引占用的字节数、删除标记等。

(2)col n:列的值,包括:类型、长度、列的值。

(3)column index:列的索引数据,用来索引每一列,记录了每列值的偏移和长度。

4.Row Index:行的索引数据,用来索引每一行,记录了每行的偏移和长度。

接下来,让我们根据 OceanBase 的开源代码,介绍微块的具体代码实现,后续的代码解读都是基于此版本:v3.1.0_CE_BP1

2. 微块中行数据的编码

行数据写入的主要逻辑在 ObMacroBlockWriter 模块,见如下的代码:

// src/storage/blocksstable/ob_macro_block_writer.cpp
// 该函数主要逻辑为:
//   1. 将行数据写入到微块中,并更新 bloomfilter 和 checksum
//   2. 在当前微块写满的情况下:构建当前的微块,并切换微块写入器
int ObMacroBlockWriter::append_row(const ObStoreRow& row, const int64_t split_size, const bool ignore_lob)
{
  int ret = OB_SUCCESS;
  const ObStoreRow* row_to_append = &row;
  // ... 省略参数检查的相关代码if (OB_SUCC(ret)) {// 向当前的微块中写入一行数据if (OB_FAIL(micro_writer_->append_row(*row_to_append))) { if (OB_BUF_NOT_ENOUGH == ret) {if (0 == micro_writer_->get_row_count()) {// 不支持超过 16KB 的行
          ret = OB_NOT_SUPPORTED;
          STORAGE_LOG(ERROR, "The single row is too large, ", K(ret), K(row));
        } else if (OB_FAIL(build_micro_block(false))) { // 当前微块写满后,需要构建该微块:编码、压缩等// for compatibilitySTORAGE_LOG(WARN, "Fail to build micro block, ", K(ret));
        } else if (OB_FAIL(micro_writer_->append_row(*row_to_append))) { // 继续写当前行数据STORAGE_LOG(ERROR, "Fail to append row to micro block, ", K(ret), K(row));
        } else if (data_store_desc_->need_calc_column_checksum_ 
                   && OB_FAIL(add_row_checksum(row_to_append->row_val_))) { // 计算 checksumSTORAGE_LOG(WARN, "fail to add column checksum", K(ret));
        }
        if (OB_SUCC(ret) && data_store_desc_->need_prebuild_bloomfilter_) {
          // 根据 rowkey 构建 bloomfilter
          const ObStoreRowkey rowkey(row_to_append->row_val_.cells_, data_store_desc_->bloomfilter_rowkey_prefix_);
          if (OB_FAIL(micro_rowkey_hashs_.push_back(static_cast<uint32_t>(rowkey.murmurhash(0))))) {
            STORAGE_LOG(WARN, "Fail to put rowkey hash to array ", K(ret), K(rowkey));
            micro_rowkey_hashs_.reuse();
            ret = OB_SUCCESS;
          }
        }
      } else {
        STORAGE_LOG(WARN, "Fail to append row to micro block, ", K(ret), K(row));
      }
    } else {
      if (data_store_desc_->need_prebuild_bloomfilter_) {
        // 根据 rowkey 构建 bloomfilter
        const ObStoreRowkey rowkey(row_to_append->row_val_.cells_, data_store_desc_->bloomfilter_rowkey_prefix_);
        if (OB_FAIL(micro_rowkey_hashs_.push_back(static_cast<uint32_t>(rowkey.murmurhash(0))))) {
          STORAGE_LOG(WARN, "Fail to put rowkey hash to array ", K(ret), K(rowkey));
          micro_rowkey_hashs_.reuse();
          ret = OB_SUCCESS;
        }
      }
      // 计算 checksumif (data_store_desc_->need_calc_column_checksum_ && OB_FAIL(add_row_checksum(row_to_append->row_val_))) {STORAGE_LOG(WARN, "fail to add column checksum", K(ret));
      } else if (micro_writer_->get_block_size() >= split_size) {
        // 如果当前微块已经满 16KB,则构建微块if (OB_FAIL(build_micro_block())) {STORAGE_LOG(WARN, "Fail to build micro block, ", K(ret));
        }
      }
    }
  }
  return ret;
}

行数据通过调用 ObMicroBlockWriter::append_row -> ObRowWriter::write 将数据写到微块中,从而进行行数据编码。编码分为两种,代码如下:

// src/storage/blocksstable/ob_row_writer.cpp
int ObRowWriter::write(const int64_t rowkey_column_count, const ObStoreRow& row, char* buf, 
                       const int64_t buf_size, int64_t& pos, int64_t& rowkey_start_pos, 
                       int64_t& rowkey_length, const bool only_row_key)
{
  int ret = OB_SUCCESS;
  if (row.is_sparse_row_) { // 使用稀疏行存储格式写入行数据if (OB_FAIL(write_sparse_row(
            rowkey_column_count, row, buf, buf_size, pos, rowkey_start_pos, rowkey_length, only_row_key))) {
      STORAGE_LOG(WARN, "write sparse row failed", K(ret), K(row), K(OB_P(buf)), K(buf_size));
    }
  } else if (OB_FAIL(write_flat_row(rowkey_column_count,
                 row,
                 buf,
                 buf_size,
                 pos,
                 rowkey_start_pos,
                 rowkey_length,
                 only_row_key))) { // 使用稠密行存储格式写入行数据
    STORAGE_LOG(WARN, "write flat row failed", K(ret), K(row), K(OB_P(buf)), K(buf_size));
  }
  return ret;
}

行数据的存储格式有两种:

1.Sparse Format:稀疏格式,行数据的每列都要存储数据,即使该列没有值也要存储 NOP;

2.Dense Format:稠密格式,行数据只存储有值的列,没有值的列不存储。

这两种格式差不多,只是对没有值的列的处理略有不同,一般常用稠密格式存储行数据,下面是相关的代码:

// src/storage/blocksstable/ob_row_writer.cpp
// 使用稠密行存储格式写入行数据
int ObRowWriter::write_flat_row(const int64_t rowkey_column_count, const ObStoreRow& row, char* buf,
    const int64_t buf_size, int64_t& pos, int64_t& rowkey_start_pos, int64_t& rowkey_length, const bool only_row_key)
{
  int ret = OB_SUCCESS;
  int64_t tmp_rowkey_start_pos = 0;
  int64_t tmp_rowkey_length = 0;
  if (!row.is_valid() || rowkey_column_count <= 0 || rowkey_column_count > row.row_val_.count_) {
    ret = OB_INVALID_ARGUMENT;
    STORAGE_LOG(ERROR, "invalid input argument.", K(buf), K(buf_size), K(row), K(rowkey_column_count), K(ret));
  } else if (OB_FAIL(init_common(buf, buf_size, pos))) {  // buf 参数初始化
    STORAGE_LOG(WARN, "row writer fail to init common.", K(ret), K(row), K(OB_P(buf)), K(buf_size), K(pos));
  } else if (OB_FAIL(init_store_row(row, rowkey_column_count))) { // 做一些参数检查
    STORAGE_LOG(WARN, "row writer fail to init store row.", K(ret), K(rowkey_column_count));
  } else if (OB_FAIL(append_row_header(row))) { // 填充 row header,不包括:列索引占用的字节数if (OB_BUF_NOT_ENOUGH != ret) {
      STORAGE_LOG(WARN, "row writer fail to append row header.", K(ret));
    }
  } else if (OB_FAIL(
                 append_store_row(rowkey_column_count, row, only_row_key, tmp_rowkey_start_pos, tmp_rowkey_length))) { // 填充行数据,具体代码见下面的代码:ObRowWriter::append_store_rowif (OB_BUF_NOT_ENOUGH != ret) {
      STORAGE_LOG(WARN, "row writer fail to append store row.", K(ret), K(rowkey_column_count));
    }
  } else if (OB_FAIL(append_column_index())) { // 填充列索引数据以及 row_header_.column_index_bytes_if (OB_BUF_NOT_ENOUGH != ret) {
      STORAGE_LOG(WARN, "row writer fail to append column index.", K(ret));
    }
  } else {
    pos = pos_;
    rowkey_start_pos = tmp_rowkey_start_pos;
    rowkey_length = tmp_rowkey_length;
  }
  return ret;
}

// 填充行数据,包括每一列的值
int ObRowWriter::append_store_row(const int64_t rowkey_column_count, const ObStoreRow& row, const bool only_row_key,
    int64_t& rowkey_start_pos, int64_t& rowkey_length)
{
  int ret = OB_SUCCESS;
  const int64_t end_index = only_row_key ? rowkey_column_count : row.row_val_.count_;
  rowkey_start_pos = pos_;
  if (!row.is_valid() || rowkey_column_count <= 0 || rowkey_column_count > row.row_val_.count_) {
    ret = OB_INVALID_ARGUMENT;
    STORAGE_LOG(WARN, "invalid row input argument.", K(row), K(rowkey_column_count), K(ret));
  } else {
    for (int64_t i = 0; OB_SUCC(ret) && i < end_index; ++i) {
      // 记录列索引,当前还没有确定列索引占用的字节数// 所以记录 3 种类型的列索引数组,在 append_column_index(...) 时,// 会最终确认列索引占用的字节数,以及使用哪个数组const int64_t column_index = pos_ - start_pos_;
      column_indexs_8_[i] = static_cast<int8_t>(column_index);
      column_indexs_16_[i] = static_cast<int16_t>(column_index);
      column_indexs_32_[i] = static_cast<int32_t>(column_index);
      // 填充列数据,包括:列类型、列值长度、列值if (OB_FAIL(append_column(row.row_val_.cells_[i]))) {if (OB_BUF_NOT_ENOUGH != ret) {// 行数据太长了
          STORAGE_LOG(WARN, "row writer fail to append column.", K(ret), K(i), K(row.row_val_.cells_[i]));
        }
        break;
      }
    }
  }
  if (OB_SUCC(ret)) {
    column_index_count_ += end_index;
    if (rowkey_column_count < end_index) {
      rowkey_length = column_indexs_32_[rowkey_column_count] + start_pos_ - rowkey_start_pos;
    } else {
      rowkey_length = pos_ - rowkey_start_pos;
    }
  }
  return ret;
}

以上是单个行数据的编码,写入多行数据,直到微块写满时,就需要构建微块了。

3. 微块的整体构建

微块是有多个行组成的,微块写满后就需要对整个微块进行序列化、压缩、填充元数据等,微块的构建代码见 ObMacroBlockWriter::build_micro_block,如下:

// src/storage/blocksstable/ob_macro_block_writer.cpp
int ObMacroBlockWriter::build_micro_block(const bool force_split)
{
  int ret = OB_SUCCESS;
  char* block_buffer = NULL;
  int64_t block_size = 0;
  bool mark_deletion = false;
  ObMicroBlockDesc micro_block_desc;

  if (micro_writer_->get_row_count() <= 0) {
    ret = OB_INNER_STAT_ERROR;
    STORAGE_LOG(WARN, "micro_block_writer is empty", K(ret));
  } else if (OB_FAIL(micro_writer_->build_block(block_buffer, block_size))) { // 序列化
    STORAGE_LOG(WARN, "Fail to build block, ", K(ret));
  } else if (OB_FAIL(
                 compressor_.compress(block_buffer, block_size, micro_block_desc.buf_, micro_block_desc.buf_size_))) { // 使用压缩算法,对微块序列化好的 buffer 进行压缩
    STORAGE_LOG(WARN, "macro block writer fail to compress.", K(ret), K(OB_P(block_buffer)), K(block_size));
  } else if (MICRO_BLOCK_MERGE_VERIFY_LEVEL::NONE != micro_writer_->get_micro_block_merge_verify_level() &&
             OB_FAIL(check_micro_block( // 检查压缩是否正确
                 micro_block_desc.buf_, micro_block_desc.buf_size_, block_buffer, block_size, micro_writer_))) {
    STORAGE_LOG(WARN, "failed to check micro block", K(ret));
  } else if (OB_FAIL(can_mark_deletion(pre_micro_last_key_, 
                                       last_key_, mark_deletion))) { // 设置宏块元数据:微块 delete mark
    STORAGE_LOG(WARN, "fail to run can mark deletion", K(ret));
  } else if (OB_FAIL(save_pre_micro_last_key(last_key_))) { // 设置宏块的元数据:微块的 endkey
    STORAGE_LOG(WARN, "Fail to save pre micro last key, ", K(ret), K_(last_key));
  } else {
    micro_block_desc.last_rowkey_ = micro_writer_->get_last_rowkey();
    micro_block_desc.data_size_ = micro_writer_->get_data_size();
    micro_block_desc.row_count_ = micro_writer_->get_row_count();
    micro_block_desc.column_count_ = micro_writer_->get_column_count();
    micro_block_desc.row_count_delta_ = micro_writer_->get_row_count_delta();
    micro_block_desc.can_mark_deletion_ = mark_deletion;
    micro_block_desc.column_checksums_ =
        data_store_desc_->need_calc_column_checksum_ ? curr_micro_column_checksum_ : NULL;
    if (data_store_desc_->is_multi_version_minor_sstable()) {
      micro_block_desc.max_merged_trans_version_ = micro_writer_->get_max_merged_trans_version();
      micro_block_desc.contain_uncommitted_row_ = micro_writer_->is_contain_uncommitted_row();
    }
    // 将该微块写入宏块,包括://   1. 将该微块放入到宏块的微块数组中//   2. 更新宏块的 bloomfilterif (OB_FAIL(write_micro_block(micro_block_desc, force_split))) {
      STORAGE_LOG(WARN, "build_micro_block failed", K(micro_block_desc), K(force_split), K(ret));
    } else {
      // 重置微块的写入器,后续可以复用micro_writer_->reuse();if (data_store_desc_->need_prebuild_bloomfilter_ && micro_rowkey_hashs_.count() > 0) {
        micro_rowkey_hashs_.reuse();
      }
    }
  }
  return ret;
}

后面,再看一下微块的压缩的代码实现。

4. 微块数据的压缩

微块的压缩的管理类为“ObCompressorPool”,支持多种压缩算法:

1.NONE_COMPRESSOR:不压缩;

2.LZ4_COMPRESSOR:lz4_1.0

3.LZ4_191_COMPRESSOR:lz4_1.9.1

4.SNAPPY_COMPRESSOR:snappy_1.0

5.ZLIB_COMPRESSOR:zlib_1.0

6.ZSTD_COMPRESSOR:zstd_1.0

7.ZSTD_1_3_8_COMPRESSOR:zstd_1.3.8

8.STREAM_LZ4_COMPRESSOR:stream_lz4_1.0

9.STREAM_ZSTD_COMPRESSOR:stream_zstd_1.0

10.STREAM_ZSTD_1_3_8_COMPRESSOR:stream_zstd_1.3.8

目前微块的压缩没有使用流式压缩(流式压缩主要用在 RPC 中),微块使用的压缩算法的实现主要是继承 ObCompressor 接口类:

// deps/oblib/src/lib/compress/ob_compressor.h
class ObCompressor {
public:
  static const char* none_compressor_name;

public:
  ObCompressor()
  {}
  virtual ~ObCompressor()
  {}
  
  // 压缩接口virtual int compress(const char* src_buffer, const int64_t src_data_size, char* dst_buffer,
      const int64_t dst_buffer_size, int64_t& dst_data_size) = 0;
  
  // 解压接口virtual int decompress(const char* src_buffer, const int64_t src_data_size, char* dst_buffer,
      const int64_t dst_buffer_size, int64_t& dst_data_size) = 0;
  
  // 获取 max(压缩后的数据大小 - 原始数据大小)// 主要用来确定 dst_buffer_size: src_data_size + max_overflow_size// 用来预分配压缩后的内存 buffervirtual int get_max_overflow_size(const int64_t src_data_size, int64_t& max_overflow_size) const = 0;

  // 获取压缩算法名称接口virtual const char* get_compressor_name() const = 0;
};

具体的算法实现可以参见具体的代码实现类:ObNoneCompressor、ObLZ4Compressor、ObLZ4Compressor191、ObSnappyCompressor、ObZlibCompressor、ObZstdCompressor、ObZstdCompressor_1_3_8 等。

微块的压缩逻辑比较简单,包装在 ObMicroBlockCompressor 中:

// src/storage/blocksstable/ob_macro_block.h
class ObMicroBlockCompressor {
public:
  ObMicroBlockCompressor();
  virtual ~ObMicroBlockCompressor();
  void reset();
  
  // 根据微块大小、压缩算法,初始化压缩器int init(const int64_t micro_block_size, const char* comp_name);
  
  // 压缩接口,通过调用 ObCompressor 的实现类的 compress 接口int compress(const char* in, const int64_t in_size, const char*& out, int64_t& out_size);
  
  // 解压接口,通过调用 ObCompressor 的实现类的 decompress 接口int decompress(const char* in, const int64_t in_size, const int64_t uncomp_size, const char*& out, int64_t& out_size);

private:
  bool is_none_;
  int64_t micro_block_size_;          // 微块大小
  common::ObCompressor* compressor_;  // 指向压缩算法的实现类
  ObSelfBufferWriter comp_buf_;       // 压缩 buffer
  ObSelfBufferWriter decomp_buf_;     // 解压 buffer// ObSelfBufferWriter 是可以自动扩展的内存 buffer 类
};

一般 LZ4、ZSTD 是比较常用的压缩算法,前者压缩速度快,后者压缩率高。

微块使用何种压缩算法,是和建表的 schema 有关系的,比如可以通过以下建表 SQL 指定使用 ZSTD 的压缩:

CREATE TABLE `t` (
  `id` int(11) NOT NULL,
  `k` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'zstd_1.0' REPLICA_NUM = 1 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 0

5. 微块存储的一些思考

基于列存的编码算法

OceanBase 数据库的三层存储结构:SSTable、MacroBlock、MicroBlock。其中微块是读 IO 的最小单元,直接影响着业务请求的延时,微块的压缩对降低 IO 延时都有比较好的改善效果。编码方面,基于行存储的编码效果并不明显,可以考虑基于列式存储的编码,比如:字典、前缀等。

Parquet 列式存储编码方式支持的就比较多,可以参考 Parquet Encodings,ORC 也基本实现了这些编码算法 ORC v2,可见列式存储编码算法的丰富性。

OceanBase 数据库中微块如果采用合适的列式编码算法,可以非常有效地降低 IO 延时以及存储空间的成本,并且列式存储对 AP 场景也有很好的性能表现。

计算量的优化方向

众所周知,数据库不仅是 CPU 密集性应用,也是 IO 密集性应用,但是随着业界 CPU 性能提升放缓,Nvme SSD 性能大幅提升的情况下,CPU 慢慢成为数据库的瓶颈,OceanBase 数据库也不例外,加上在微块编码、压缩等方面的 CPU 开销,对 CPU 性能要求会越来越高。

针对目前的 CPU 瓶颈,业界普遍采用硬件加速的方式来降低机器 CPU 的使用率:

1.FPGA:将计算量卸载到硬件中,降低机器 CPU 的使用率;

2.SIMD:单指令流、多数据流,例如 Intel 的 MMXSSE,以及 AMD 的 3D Now! 指令集;

3.其他。

6. 微块存储格式的 demo

可以通过下面的真实的一个微块 demo,更好的理解微块的存储格式:

OceanBase 存储层代码解读(二)微块存储格式-4

通过 record header 可以知道微块的大小,是否启用压缩(data_length、data_zlength 相等表示没有压缩),以及微块数据的 checksum。

在 micro block header 中,主要记录了行数、列数,以及的行索引偏移量,行索引记录在微块的最后。

Row 0 表示第一行数据,ObRowHeader 主要记录每列索引占用的字节数(column_index_bytes),列索引的偏移量是通过行索引得出第一行的偏移和长度往前推8字节(column_count * column_index_bytes)得出来的。

每一列中都一个列元数据和列值,列元数据中记录了列值的类型、长度,列值中才真正记录了列的数据。

总结

以上通过走读代码的方式,梳理了微块的存储格式,后面会继续走读 OceanBase 数据库的存储层的代码:

1.宏块的存储格式;

2.合并的主要逻辑;

3.宏块的 GC 实现;

4.其他。

最后的最后:如果您有任何疑问,可以通过以下方式与我们进行交流:

钉钉群:33254054

OceanBase 存储层代码解读(二)微块存储格式-5

微信群:扫码添加小助手,将拉你进群哟~

OceanBase 存储层代码解读(二)微块存储格式-6

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论