前言
本文是作为上一个LevelDB之SSTable的补充,因为上文介绍了SSTable的组织结构,本文会在代码层面介绍具体的实现。
从MemTable写入到SSTable
前文的MemTable 已经介绍了对应的SkipList中的Key的编码,也介绍了MemTable写入SSTable的写入时机。接下来看SkipList 是如何序列化LevelDB的SSTable的。入口在db/db_impl.cc
的WriteLevel0Table
方法中,序列化为SSTable主要入口在
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}
写入流程
BuildTable方法位于db/builder.cc
中,具体代码如下:
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst(); // 传入的是当前的SkipList的迭代器,iter 返回的key是去掉了长度的key,也就是没有internal_key_size 的值
std::string fname = TableFileName(dbname, meta->number); //返回当前的table名字,主要就是在dbname的目录下创建一个number的文件
if (iter->Valid()) { //查看迭代器是否正常,主要是判断头节点是否为nullptr,即有没有值
WritableFile* file; //新建写入的文件
s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}
// 创建TableBuilder
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key()); // 记录最小值
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value()); //迭代将数据加入到Builder中
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);//记录最大值
}
// Finish and check for builder errors
s = builder->Finish(); //将builder的元数据,包含了filter block ,index block,Footer等写入到文件中
if (s.ok()) {
meta->file_size = builder->FileSize();// 记录文件大小
assert(meta->file_size > 0);
}
delete builder; // 清楚数据
// Finish and check for file errors
if (s.ok()) {
s = file->Sync(); //这里主要是确定当前的文件正确写入到磁盘上了,避免出现写入MAINFEST文件的数据中有脏数据
}
if (s.ok()) {
s = file->Close();
}
delete file;
// 有删减
return s;
}
可以看到,入参是一个iter。这个是MemTable的迭代器,返回的是去掉了internal_key_size_length的真实的Key。进入到TableBuilder的Add方法中。
上面的步骤概括为下面几步:
Req
在TableBuilder创建后,会创建一个叫Req的struct。在整个TableBuilder中,这个struct就是公共的数据容器。
struct TableBuilder::Rep {
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;
}
Options options; // 传入的options,可以自定义的一些参数,如控制一个datarow的数据大小的block_size 等
Options index_block_options; // 使用在index_block中的参数,如多久进行一次前缀压缩的block_restart_interval
WritableFile* file; // 当前写入的文件
uint64_t offset; //当前的offset
Status status;
BlockBuilder data_block; // 存储在内存中的datablock
BlockBuilder index_block; //存储在内存中的index_block
std::string last_key; // 当前datablock中的最后一个key或者比当前data block都大的key
int64_t num_entries; // 包含了多少的数据
bool closed; // Either Finish() or Abandon() has been called.
FilterBlockBuilder* filter_block; //如果没有设置过滤器,则该参数为nullptr
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and pending_index_entry is true only if data_block is empty.
bool pending_index_entry; // true 分为两种情况,1 当前的data block 超过了上面的提到的block size,2 整个Table刷入磁盘的时候 false 则说明在写入data block
BlockHandle pending_handle; // Handle to add to index block // 主要用来记录data block的的offset 和size ,然后记录到index block中
std::string compressed_output;
};
上面的参数中,主要是使用last key记录每次写入的最大的key,然后就是index block 记录的handle 。后面会看下这些参数在文件中是如何使用的。
写入Data Block
迭代器中的额Add方法仍然是在TableBuilder 中实现的,在这个方法中,filter 和index 的block 都是先写在内存中,Data Block则是按照options.block_size
的值判断是否写入磁盘,这个值默认是4kb。
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) { // index 的值是false
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
if (r->filter_block != nullptr) { // bloom filter
r->filter_block->AddKey(key);
}
r->last_key.assign(key.data(), key.size()); // 记录最后的key
r->num_entries++; // 记录值的多少
r->data_block.Add(key, value); // 写入data block
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
上面的方法比较简单,index Block的key值是当前的已经记录的Block的最大的key,Value则是当前记录的4kb数据中的起始offset和size
pending_index_entry 的值开始为false,下面两种情况可能会转为true
- 当前的DataBlock的数据超过4KB,则需要将当前的datablock 写入到文件中,也就是上文的Flush方法,该方法会记录data block的offset 和size 到pending_handle中,等到下次写入key的时候将这个信息写入到index block中
- 上面写入index block 是在下次写入的时候做的,如果没有下次写入,则整个SSTable在结束的时候会再调用一次Flush,将当前内存中的DataBlock 收入文件,最后还会更新一次last key,并且刷入到index block中
如果filter不为nullptr,则会将每次key都写入到filter block中,等到自后写入文件。
Block Builder
可以看到,上面的index block 和data block,本质上都是个BlockBuilder,也就是说这个数据在写入的时候,最后都是按照一个Block的处理逻辑,为什么可以这么做?因为在LevelDB中,最后都是使用的Slice 的Key value,就是对外提供的接口是Key,Value的数据库,在内部很多地方抽象出来最后也是按照一个Key Value的方式存储。
BlockBuilder 位于table/block_builder.h
和table/block_builder.cc
中,主要的属性有:
const Options* options_; // 即从Table Builder中传入的参数
std::string buffer_; // Destination buffer
std::vector restarts_; // Restart points // 前缀压缩开始节点的offset
int counter_; // Number of entries emitted since restart //每一次restart中包含的数据大小,这个值主要是用来进行判断是否达到options中的block_restart_interval参数,默认是16,即16个key进行一次前缀压缩
bool finished_; // Has Finish() been called?
std::string last_key_; // 当前block的最大的key
为了节约空间而做了前缀压缩,然后记录下每次前缀压缩的起始位置,便于后面查询使用二分查找。
下面将两个方法一起看,即Add 方法和Finish 方法,其中finish方法是在即将写入文件的时候将restart写入block中,add则是加入每个data数据。
Slice BlockBuilder::Finish() {
// Append restart array
for (size_t i = 0; i comparator->Compare(key, last_key_piece) > 0);
size_t shared = 0;
if (counter_ block_restart_interval) {// 前缀压缩阈值,默认16
// See how much sharing to do with previous string
const size_t min_length = std::min(last_key_piece.size(), key.size()); // 当前值和前一个key的大小取最小。然后判断和前一个key相同的前缀。记录到shared中
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {
// Restart compression
restarts_.push_back(buffer_.size());
counter_ = 0;
}
const size_t non_shared = key.size() - shared; // 不相同的后缀
// Add "" to buffer_
// 本身的注释说明了写入的格式,将长度先写入到数据中
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());
// 将key不和前缀相同的数据写入到buffer
// Add string delta to buffer_ followed by value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());
// Update state 记录lastkey,这里记录的是真实的key,而不是压缩后的
last_key_.resize(shared);
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;
}
还是将上次的图拿出来用用,最后的block数据如下图:
在调用add 方法中的数据达到阈值4kb的时候,就会写入文件,调用的方法就是WriteBlock,
TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle)
这里的handle就是前面提到的Req中的Handle,传入的主要作用其实就是修改和记录Req中的Handle,便于后续在footer 中写入handle。在flush方法中,首先会调用finish方法,写入restart的数据,然后就是写入文件了:
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type; // type 是压缩的类型
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}
也就是在上面的方法中,会将handle的值进行更新,这里的handle传入的值为filter_block_handle, metaindex_block_handle, index_block_handle。是在TableBuilder的FInish方法中的局部变量,用来写入Write metaindex block 的handle 和 footer的handle。作用就是用来标识filter block等的起始offset和大小。
可以看到上面的BlockBuilder 会对每一次写入文件都记录crc校验。还有压缩的type。
Data Block 总结
data block在写入的过程中记录到filter的数据,也记录了restart的数据,每间隔4kb会将index block的数据更新。这些目前都是在内存中,只有在Table Builder的finish方法中最后写入到磁盘上。
其他Block的写入
其他的Block的写入最后的数据落盘和上面的Data Block 差不多,即都是通过前缀压缩后写入到磁盘,并且追加crc和type。后文不在赘述Block落盘,这里感叹下这种kv设计的思路和对数据写入的统一设计看的舒服。
Finish 方法的实现如下:
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
} &filter_block_handle);
}
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
首先会调用下前面提到的flush方法,将当前的内存中的datablock写入到文件中。
写入filter的数据,filter数据最后是按照默认2kb创建一个filter的方式将数据写入的。然后会在meta_index_block 中记录他的名称和handle的数据。后续可能会有更多的其他的Block放在这个meta_index_block的范围内。
这里在详细探讨下FIlterBlock的数据写入,首先看下Filter Block中的成员变量:
Builder的成员变量:
const FilterPolicy* policy_; //算法
std::string keys_; // Flattened key contents 所有的key全都append在这个里面
std::vector start_; // Starting index in keys_ of each key // 记录在keys中的每一个key的起始offset
std::string result_; // Filter data computed so far // 最后计算的结果
std::vector tmp_keys_; // policy_->CreateFilter() argument // 在计算过程中的中间变量,这个变量相当于是将keys_中的数据还原
std::vector filter_offsets_; // 每一个filter 初始的offset,注意的是filter 的长度是定长的,一般为2kb
Reader的成员变量
const FilterPolicy* policy_;
const char* data_; // Pointer to filter data (at block-start) // 当前filter对应的data Block中的数据起始offset
const char* offset_; // Pointer to beginning of offset array (at block-end) // 当前filter指向的data block的结束位置
size_t num_; // Number of entries in offset array //当前reader中的filter的数量
size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file) //2kb
每次写入的时候都会写入到Filter中
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}
void FilterBlockBuilder::AddKey(const Slice& key) {
Slice k = key;
start_.push_back(keys_.size()); // 记录k当前的size,也就是下个key开始的起始位置
keys_.append(k.data(), k.size());// 将key的值和大小写入到keys_中
}
每次DataBlock初始化或者调用Flush方法的时候都会调用一个Filter block的StartBlock方法:
// 初始化
TableBuilder::TableBuilder(const Options& options, WritableFile* file)
: rep_(new Rep(options, file)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
}
// flush
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
上面两个方法的区别在于,初始化的时候为0,flush的时候为当前Req的offset。因为当前Req的offset只有在WriteRawBlock中会变化,也就是追加当前写入的值的数据,所以此时的offset就是等于上一次写的offset。
void FilterBlockBuilder::StartBlock(uint64_t block_offset) {
uint64_t filter_index = (block_offset / kFilterBase);
assert(filter_index >= filter_offsets_.size());
while (filter_index > filter_offsets_.size()) {
GenerateFilter();
}
}
因为filter创建的维度是按照当前block的大小来的,所以传入一个Block的offset就可以找到对应的filter的位置。下面举例说明下:
block_offset
为0,此时方法直接返回,不会进入创建过程Fush
方法,说明写入了4kb的数据,此时block_offset是4kb调用的GenerateFilter方法为:
void FilterBlockBuilder::GenerateFilter() {
const size_t num_keys = start_.size();
if (num_keys == 0) {
// Fast path if there are no keys for this filter
filter_offsets_.push_back(result_.size());
return;
}
// Make list of keys from flattened key structure
start_.push_back(keys_.size()); // Simplify length computation
tmp_keys_.resize(num_keys);
for (size_t i = 0; i CreateFilter(&tmp_keys_[0], static_cast(num_keys), &result_);
tmp_keys_.clear();
keys_.clear();
start_.clear();
}
继续刚才的例子:
如果按照上面的例子就是,首先是2kb的数据,会将2kb的数据中包含的key从start中取出来生成,append到result中,并且在filter_offsets_中记录下起始位置。
最后在Finish中完成数据最后的拼接:
Slice FilterBlockBuilder::Finish() {
if (!start_.empty()) {
GenerateFilter();
}
// Append array of per-filter offsets
const uint32_t array_offset = result_.size();
for (size_t i = 0; i < filter_offsets_.size(); i++) {
PutFixed32(&result_, filter_offsets_[i]);
}
PutFixed32(&result_, array_offset);
result_.push_back(kFilterBaseLg); // Save encoding parameter in result
return Slice(result_);
}
finish最后其实就是将offet使用32位定长拼接到了result后面。所以filter中filter_offsets_ 到底记录的是什么呢?
记录的其实是当前的filter在result中的2kb的数据的filter,如果通过index block 查找到当前的key存在的datablock的offset,那么就可以根据offset算出filter在filter_offset 中的位置,然后找到filter的具体数据。查询的时候就可以直接获取这个位置的布隆过滤器的值,从而快速判断是否存在这个值了。
需要注意的是index block中会最后写一个lastkey,这个lastkey会将当前block中的最大的key加一,如果是hello+sequence|type,就会变成i+maxSequence|type.
最后会将metaindex_block_handle
和index_block_handle
的值写入到footer中。footer 是一个定长的48字节的值。之所以是48字节是因为,两个Handle中,每一个都包含了一个uint64_t,在Varint中说过,如果是64位最大需要10个字节来表示数据,所以就是2个10字节,外加上最后的magicnumber占用8字节,一共就是48字节。
后续操作
当数据写入成功后,会将当前的文件的信息写入到Meta数据中,还会判断当前的文件是否已经先于Meta文件写入磁盘。最后还会将当前的数据放入到cache中。本文暂时不考虑Cache部分的数据,仅记录下,也就是Level0 的数据都是写入到Cache中的。
从SSTable中查询
SSTable 分为多个层级,本文仅仅涉及到SSTable的读取,所以不详细介绍Version。总之一句话就是从0层到6层挨个查询。最后会在table/table.cc
的InternalGet方法中
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
(*handle_result)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok()) {
s = iiter->status();
}
delete iiter;
return s;
}
首先就是获取到index_block的Iterator。然后在文件中查询值:
void Seek(const Slice& target) override {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
int current_key_compare = 0;
if (Valid()) {
// If we're already scanning, use the current position as a starting
// point. This is beneficial if the key we're seeking to is ahead of the
// current position.
current_key_compare = Compare(key_, target);
if (current_key_compare 0) {
right = restart_index_;
} else {
// We're seeking to the key we're already at.
return;
}
}
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr =
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
&non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) = "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}
// We might be able to use our current position within the restart block.
// This is true if we determined the key we desire is in the current block
// and is after than the current key.
assert(current_key_compare == 0 || Valid());
bool skip_seek = left == restart_index_ && current_key_compare = target
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}
写入的时候,我们是将lastkey写入到index block的,所以找的也就是当前key是不是属于这个block的,也就是找到最后一个大于这个key的block。这里的Iterator是在查询index block中的数据。
回到InternalGet 方法,在获取到存在index block 中的数据后,就去对应的data block中查找数据,最后两者都是走的block的Iterator。然后就是返回结果了。
总结
本文主要对写入SSTable 做了代码层级的说明,只能说是照着代码走了一遍。学习到了以下内容: