LevelDB之SSTable读写
前言
本文是作为上一个LevelDB之SSTable的补充,因为上文介绍了SSTable的组织结构,本文会在代码层面介绍具体的实现。
从MemTable写入到SSTable
前文的MemTable 已经介绍了对应的SkipList中的Key的编码,也介绍了MemTable写入SSTable的写入时机。接下来看SkipList 是如何序列化LevelDB的SSTable的。入口在db/db_impl.cc
的WriteLevel0Table
方法中,序列化为SSTable主要入口在
{ mutex_.Unlock(); s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); mutex_.Lock(); }
写入流程
BuildTable方法位于db/builder.cc
中,具体代码如下:
Status BuildTable(const std::string& dbname, Env* env, const Options& options, TableCache* table_cache, Iterator* iter, FileMetaData* meta) { Status s; meta->file_size = 0; iter->SeekToFirst(); // 传入的是当前的SkipList的迭代器,iter 返回的key是去掉了长度的key,也就是没有internal_key_size 的值 std::string fname = TableFileName(dbname, meta->number); //返回当前的table名字,主要就是在dbname的目录下创建一个number的文件 if (iter->Valid()) { //查看迭代器是否正常,主要是判断头节点是否为nullptr,即有没有值 WritableFile* file; //新建写入的文件 s = env->NewWritableFile(fname, &file); if (!s.ok()) { return s; } // 创建TableBuilder TableBuilder* builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key()); // 记录最小值 Slice key; for (; iter->Valid(); iter->Next()) { key = iter->key(); builder->Add(key, iter->value()); //迭代将数据加入到Builder中 } if (!key.empty()) { meta->largest.DecodeFrom(key);//记录最大值 } // Finish and check for builder errors s = builder->Finish(); //将builder的元数据,包含了filter block ,index block,Footer等写入到文件中 if (s.ok()) { meta->file_size = builder->FileSize();// 记录文件大小 assert(meta->file_size > 0); } delete builder; // 清楚数据 // Finish and check for file errors if (s.ok()) { s = file->Sync(); //这里主要是确定当前的文件正确写入到磁盘上了,避免出现写入MAINFEST文件的数据中有脏数据 } if (s.ok()) { s = file->Close(); } delete file; // 有删减 return s; }
可以看到,入参是一个iter。这个是MemTable的迭代器,返回的是去掉了internal_key_size_length的真实的Key。进入到TableBuilder的Add方法中。
上面的步骤概括为下面几步:
Req
在TableBuilder创建后,会创建一个叫Req的struct。在整个TableBuilder中,这个struct就是公共的数据容器。
struct TableBuilder::Rep { Rep(const Options& opt, WritableFile* f) : options(opt), index_block_options(opt), file(f), offset(0), data_block(&options), index_block(&index_block_options), num_entries(0), closed(false), filter_block(opt.filter_policy == nullptr ? nullptr : new FilterBlockBuilder(opt.filter_policy)), pending_index_entry(false) { index_block_options.block_restart_interval = 1; } Options options; // 传入的options,可以自定义的一些参数,如控制一个datarow的数据大小的block_size 等 Options index_block_options; // 使用在index_block中的参数,如多久进行一次前缀压缩的block_restart_interval WritableFile* file; // 当前写入的文件 uint64_t offset; //当前的offset Status status; BlockBuilder data_block; // 存储在内存中的datablock BlockBuilder index_block; //存储在内存中的index_block std::string last_key; // 当前datablock中的最后一个key或者比当前data block都大的key int64_t num_entries; // 包含了多少的数据 bool closed; // Either Finish() or Abandon() has been called. FilterBlockBuilder* filter_block; //如果没有设置过滤器,则该参数为nullptr // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter // keys in the index block. For example, consider a block boundary // between the keys "the quick brown fox" and "the who". We can use // "the r" as the key for the index block entry since it is >= all // entries in the first block and pending_index_entry is true only if data_block is empty. bool pending_index_entry; // true 分为两种情况,1 当前的data block 超过了上面的提到的block size,2 整个Table刷入磁盘的时候 false 则说明在写入data block BlockHandle pending_handle; // Handle to add to index block // 主要用来记录data block的的offset 和size ,然后记录到index block中 std::string compressed_output; };
上面的参数中,主要是使用last key记录每次写入的最大的key,然后就是index block 记录的handle 。后面会看下这些参数在文件中是如何使用的。
写入Data Block
迭代器中的额Add方法仍然是在TableBuilder 中实现的,在这个方法中,filter 和index 的block 都是先写在内存中,Data Block则是按照options.block_size
的值判断是否写入磁盘,这个值默认是4kb。
void TableBuilder::Add(const Slice& key, const Slice& value) { Rep* r = rep_; assert(!r->closed); if (!ok()) return; if (r->num_entries > 0) { assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); } if (r->pending_index_entry) { // index 的值是false assert(r->data_block.empty()); r->options.comparator->FindShortestSeparator(&r->last_key, key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; } if (r->filter_block != nullptr) { // bloom filter r->filter_block->AddKey(key); } r->last_key.assign(key.data(), key.size()); // 记录最后的key r->num_entries++; // 记录值的多少 r->data_block.Add(key, value); // 写入data block const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); if (estimated_block_size >= r->options.block_size) { Flush(); } }
上面的方法比较简单,index Block的key值是当前的已经记录的Block的最大的key,Value则是当前记录的4kb数据中的起始offset和size
pending_index_entry 的值开始为false,下面两种情况可能会转为true
- 当前的DataBlock的数据超过4KB,则需要将当前的datablock 写入到文件中,也就是上文的Flush方法,该方法会记录data block的offset 和size 到pending_handle中,等到下次写入key的时候将这个信息写入到index block中
- 上面写入index block 是在下次写入的时候做的,如果没有下次写入,则整个SSTable在结束的时候会再调用一次Flush,将当前内存中的DataBlock 收入文件,最后还会更新一次last key,并且刷入到index block中
如果filter不为nullptr,则会将每次key都写入到filter block中,等到自后写入文件。
Block Builder
可以看到,上面的index block 和data block,本质上都是个BlockBuilder,也就是说这个数据在写入的时候,最后都是按照一个Block的处理逻辑,为什么可以这么做?因为在LevelDB中,最后都是使用的Slice 的Key value,就是对外提供的接口是Key,Value的数据库,在内部很多地方抽象出来最后也是按照一个Key Value的方式存储。
BlockBuilder 位于table/block_builder.h
和table/block_builder.cc
中,主要的属性有:
const Options* options_; // 即从Table Builder中传入的参数 std::string buffer_; // Destination buffer std::vector restarts_; // Restart points // 前缀压缩开始节点的offset int counter_; // Number of entries emitted since restart //每一次restart中包含的数据大小,这个值主要是用来进行判断是否达到options中的block_restart_interval参数,默认是16,即16个key进行一次前缀压缩 bool finished_; // Has Finish() been called? std::string last_key_; // 当前block的最大的key
为了节约空间而做了前缀压缩,然后记录下每次前缀压缩的起始位置,便于后面查询使用二分查找。
下面将两个方法一起看,即Add 方法和Finish 方法,其中finish方法是在即将写入文件的时候将restart写入block中,add则是加入每个data数据。
Slice BlockBuilder::Finish() { // Append restart array for (size_t i = 0; i comparator->Compare(key, last_key_piece) > 0); size_t shared = 0; if (counter_ block_restart_interval) {// 前缀压缩阈值,默认16 // See how much sharing to do with previous string const size_t min_length = std::min(last_key_piece.size(), key.size()); // 当前值和前一个key的大小取最小。然后判断和前一个key相同的前缀。记录到shared中 while ((shared < min_length) && (last_key_piece[shared] == key[shared])) { shared++; } } else { // Restart compression restarts_.push_back(buffer_.size()); counter_ = 0; } const size_t non_shared = key.size() - shared; // 不相同的后缀 // Add "" to buffer_ // 本身的注释说明了写入的格式,将长度先写入到数据中 PutVarint32(&buffer_, shared); PutVarint32(&buffer_, non_shared); PutVarint32(&buffer_, value.size()); // 将key不和前缀相同的数据写入到buffer // Add string delta to buffer_ followed by value buffer_.append(key.data() + shared, non_shared); buffer_.append(value.data(), value.size()); // Update state 记录lastkey,这里记录的是真实的key,而不是压缩后的 last_key_.resize(shared); last_key_.append(key.data() + shared, non_shared); assert(Slice(last_key_) == key); counter_++; }
还是将上次的图拿出来用用,最后的block数据如下图:
在调用add 方法中的数据达到阈值4kb的时候,就会写入文件,调用的方法就是WriteBlock,
TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle)
这里的handle就是前面提到的Req中的Handle,传入的主要作用其实就是修改和记录Req中的Handle,便于后续在footer 中写入handle。在flush方法中,首先会调用finish方法,写入restart的数据,然后就是写入文件了:
void TableBuilder::WriteRawBlock(const Slice& block_contents, CompressionType type, BlockHandle* handle) { Rep* r = rep_; handle->set_offset(r->offset); handle->set_size(block_contents.size()); r->status = r->file->Append(block_contents); if (r->status.ok()) { char trailer[kBlockTrailerSize]; trailer[0] = type; // type 是压缩的类型 uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size()); crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type EncodeFixed32(trailer + 1, crc32c::Mask(crc)); r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); if (r->status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; } } }
也就是在上面的方法中,会将handle的值进行更新,这里的handle传入的值为filter_block_handle, metaindex_block_handle, index_block_handle。是在TableBuilder的FInish方法中的局部变量,用来写入Write metaindex block 的handle 和 footer的handle。作用就是用来标识filter block等的起始offset和大小。
可以看到上面的BlockBuilder 会对每一次写入文件都记录crc校验。还有压缩的type。
Data Block 总结
data block在写入的过程中记录到filter的数据,也记录了restart的数据,每间隔4kb会将index block的数据更新。这些目前都是在内存中,只有在Table Builder的finish方法中最后写入到磁盘上。
其他Block的写入
其他的Block的写入最后的数据落盘和上面的Data Block 差不多,即都是通过前缀压缩后写入到磁盘,并且追加crc和type。后文不在赘述Block落盘,这里感叹下这种kv设计的思路和对数据写入的统一设计看的舒服。
Finish 方法的实现如下:
Status TableBuilder::Finish() { Rep* r = rep_; Flush(); assert(!r->closed); r->closed = true; BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; // Write filter block if (ok() && r->filter_block != nullptr) { WriteRawBlock(r->filter_block->Finish(), kNoCompression, &filter_block_handle); } // Write metaindex block if (ok()) { BlockBuilder meta_index_block(&r->options); if (r->filter_block != nullptr) { // Add mapping from "filter.Name" to location of filter data std::string key = "filter."; key.append(r->options.filter_policy->Name()); std::string handle_encoding; filter_block_handle.EncodeTo(&handle_encoding); meta_index_block.Add(key, handle_encoding); } // TODO(postrelease): Add stats and other meta blocks WriteBlock(&meta_index_block, &metaindex_block_handle); } // Write index block if (ok()) { if (r->pending_index_entry) { r->options.comparator->FindShortSuccessor(&r->last_key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; } WriteBlock(&r->index_block, &index_block_handle); } // Write footer if (ok()) { Footer footer; footer.set_metaindex_handle(metaindex_block_handle); footer.set_index_handle(index_block_handle); std::string footer_encoding; footer.EncodeTo(&footer_encoding); r->status = r->file->Append(footer_encoding); if (r->status.ok()) { r->offset += footer_encoding.size(); } } return r->status; } &filter_block_handle); } // Write metaindex block if (ok()) { BlockBuilder meta_index_block(&r->options); if (r->filter_block != nullptr) { // Add mapping from "filter.Name" to location of filter data std::string key = "filter."; key.append(r->options.filter_policy->Name()); std::string handle_encoding; filter_block_handle.EncodeTo(&handle_encoding); meta_index_block.Add(key, handle_encoding); } // TODO(postrelease): Add stats and other meta blocks WriteBlock(&meta_index_block, &metaindex_block_handle); } // Write index block if (ok()) { if (r->pending_index_entry) { r->options.comparator->FindShortSuccessor(&r->last_key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; } WriteBlock(&r->index_block, &index_block_handle); } // Write footer if (ok()) { Footer footer; footer.set_metaindex_handle(metaindex_block_handle); footer.set_index_handle(index_block_handle); std::string footer_encoding; footer.EncodeTo(&footer_encoding); r->status = r->file->Append(footer_encoding); if (r->status.ok()) { r->offset += footer_encoding.size(); } } return r->status; }
首先会调用下前面提到的flush方法,将当前的内存中的datablock写入到文件中。
写入filter的数据,filter数据最后是按照默认2kb创建一个filter的方式将数据写入的。然后会在meta_index_block 中记录他的名称和handle的数据。后续可能会有更多的其他的Block放在这个meta_index_block的范围内。
这里在详细探讨下FIlterBlock的数据写入,首先看下Filter Block中的成员变量:
Builder的成员变量:
const FilterPolicy* policy_; //算法 std::string keys_; // Flattened key contents 所有的key全都append在这个里面 std::vector start_; // Starting index in keys_ of each key // 记录在keys中的每一个key的起始offset std::string result_; // Filter data computed so far // 最后计算的结果 std::vector tmp_keys_; // policy_->CreateFilter() argument // 在计算过程中的中间变量,这个变量相当于是将keys_中的数据还原 std::vector filter_offsets_; // 每一个filter 初始的offset,注意的是filter 的长度是定长的,一般为2kb
Reader的成员变量
const FilterPolicy* policy_; const char* data_; // Pointer to filter data (at block-start) // 当前filter对应的data Block中的数据起始offset const char* offset_; // Pointer to beginning of offset array (at block-end) // 当前filter指向的data block的结束位置 size_t num_; // Number of entries in offset array //当前reader中的filter的数量 size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file) //2kb
每次写入的时候都会写入到Filter中
if (r->filter_block != nullptr) { r->filter_block->AddKey(key); } void FilterBlockBuilder::AddKey(const Slice& key) { Slice k = key; start_.push_back(keys_.size()); // 记录k当前的size,也就是下个key开始的起始位置 keys_.append(k.data(), k.size());// 将key的值和大小写入到keys_中 }
每次DataBlock初始化或者调用Flush方法的时候都会调用一个Filter block的StartBlock方法:
// 初始化 TableBuilder::TableBuilder(const Options& options, WritableFile* file) : rep_(new Rep(options, file)) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } } // flush if (r->filter_block != nullptr) { r->filter_block->StartBlock(r->offset); }
上面两个方法的区别在于,初始化的时候为0,flush的时候为当前Req的offset。因为当前Req的offset只有在WriteRawBlock中会变化,也就是追加当前写入的值的数据,所以此时的offset就是等于上一次写的offset。
void FilterBlockBuilder::StartBlock(uint64_t block_offset) { uint64_t filter_index = (block_offset / kFilterBase); assert(filter_index >= filter_offsets_.size()); while (filter_index > filter_offsets_.size()) { GenerateFilter(); } }
因为filter创建的维度是按照当前block的大小来的,所以传入一个Block的offset就可以找到对应的filter的位置。下面举例说明下:
block_offset
为0,此时方法直接返回,不会进入创建过程Fush
方法,说明写入了4kb的数据,此时block_offset是4kb调用的GenerateFilter方法为:
void FilterBlockBuilder::GenerateFilter() { const size_t num_keys = start_.size(); if (num_keys == 0) { // Fast path if there are no keys for this filter filter_offsets_.push_back(result_.size()); return; } // Make list of keys from flattened key structure start_.push_back(keys_.size()); // Simplify length computation tmp_keys_.resize(num_keys); for (size_t i = 0; i CreateFilter(&tmp_keys_[0], static_cast(num_keys), &result_); tmp_keys_.clear(); keys_.clear(); start_.clear(); }
继续刚才的例子:
如果按照上面的例子就是,首先是2kb的数据,会将2kb的数据中包含的key从start中取出来生成,append到result中,并且在filter_offsets_中记录下起始位置。
最后在Finish中完成数据最后的拼接:
Slice FilterBlockBuilder::Finish() { if (!start_.empty()) { GenerateFilter(); } // Append array of per-filter offsets const uint32_t array_offset = result_.size(); for (size_t i = 0; i < filter_offsets_.size(); i++) { PutFixed32(&result_, filter_offsets_[i]); } PutFixed32(&result_, array_offset); result_.push_back(kFilterBaseLg); // Save encoding parameter in result return Slice(result_); }
finish最后其实就是将offet使用32位定长拼接到了result后面。所以filter中filter_offsets_ 到底记录的是什么呢?
记录的其实是当前的filter在result中的2kb的数据的filter,如果通过index block 查找到当前的key存在的datablock的offset,那么就可以根据offset算出filter在filter_offset 中的位置,然后找到filter的具体数据。查询的时候就可以直接获取这个位置的布隆过滤器的值,从而快速判断是否存在这个值了。
需要注意的是index block中会最后写一个lastkey,这个lastkey会将当前block中的最大的key加一,如果是hello+sequence|type,就会变成i+maxSequence|type.
最后会将metaindex_block_handle
和index_block_handle
的值写入到footer中。footer 是一个定长的48字节的值。之所以是48字节是因为,两个Handle中,每一个都包含了一个uint64_t,在Varint中说过,如果是64位最大需要10个字节来表示数据,所以就是2个10字节,外加上最后的magicnumber占用8字节,一共就是48字节。
后续操作
当数据写入成功后,会将当前的文件的信息写入到Meta数据中,还会判断当前的文件是否已经先于Meta文件写入磁盘。最后还会将当前的数据放入到cache中。本文暂时不考虑Cache部分的数据,仅记录下,也就是Level0 的数据都是写入到Cache中的。
从SSTable中查询
SSTable 分为多个层级,本文仅仅涉及到SSTable的读取,所以不详细介绍Version。总之一句话就是从0层到6层挨个查询。最后会在table/table.cc
的InternalGet方法中
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, void (*handle_result)(void*, const Slice&, const Slice&)) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); iiter->Seek(k); if (iiter->Valid()) { Slice handle_value = iiter->value(); FilterBlockReader* filter = rep_->filter; BlockHandle handle; if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() && !filter->KeyMayMatch(handle.offset(), k)) { // Not found } else { Iterator* block_iter = BlockReader(this, options, iiter->value()); block_iter->Seek(k); if (block_iter->Valid()) { (*handle_result)(arg, block_iter->key(), block_iter->value()); } s = block_iter->status(); delete block_iter; } } if (s.ok()) { s = iiter->status(); } delete iiter; return s; }
首先就是获取到index_block的Iterator。然后在文件中查询值:
void Seek(const Slice& target) override { // Binary search in restart array to find the last restart point // with a key < target uint32_t left = 0; uint32_t right = num_restarts_ - 1; int current_key_compare = 0; if (Valid()) { // If we're already scanning, use the current position as a starting // point. This is beneficial if the key we're seeking to is ahead of the // current position. current_key_compare = Compare(key_, target); if (current_key_compare 0) { right = restart_index_; } else { // We're seeking to the key we're already at. return; } } while (left < right) { uint32_t mid = (left + right + 1) / 2; uint32_t region_offset = GetRestartPoint(mid); uint32_t shared, non_shared, value_length; const char* key_ptr = DecodeEntry(data_ + region_offset, data_ + restarts_, &shared, &non_shared, &value_length); if (key_ptr == nullptr || (shared != 0)) { CorruptionError(); return; } Slice mid_key(key_ptr, non_shared); if (Compare(mid_key, target) = "target". Therefore all blocks at or // after "mid" are uninteresting. right = mid - 1; } } // We might be able to use our current position within the restart block. // This is true if we determined the key we desire is in the current block // and is after than the current key. assert(current_key_compare == 0 || Valid()); bool skip_seek = left == restart_index_ && current_key_compare = target while (true) { if (!ParseNextKey()) { return; } if (Compare(key_, target) >= 0) { return; } } }
写入的时候,我们是将lastkey写入到index block的,所以找的也就是当前key是不是属于这个block的,也就是找到最后一个大于这个key的block。这里的Iterator是在查询index block中的数据。
回到InternalGet 方法,在获取到存在index block 中的数据后,就去对应的data block中查找数据,最后两者都是走的block的Iterator。然后就是返回结果了。
总结
本文主要对写入SSTable 做了代码层级的说明,只能说是照着代码走了一遍。学习到了以下内容: