前言
在 LevelDB 中,写完 WAL 日志以后,就可以将数据写入到 MemTable 了。MemTable 是 LSM-Tree必不可缺的一个组件,主要作用如下:
在具体的使用中,MemTable 需要在内存中开辟堆空间,所以需要内存管理。客户端一般写入MemTable后就可以返回成功。
本文将针对MemTable 在 LevelDB 中的实现做一个简单的介绍,一起将客户端的写入过程也做了个介绍,会涉及到LevelDB如何控制并发等。
MemTable的实现在db/memtable.h
和db/memtable.cc
。内存管理的是实现是在util/arena.h
数据结构
MemTable 数据结构就是前面提到过的SkipList。LevelDB 是将key和value组合在一起成为一个SkipList中的Node,所以MemTable中还包含了一个比较器。这里对SkipList不再做具体的介绍,如果想深入了解,可以看下我前面写的SkipList原理和Java实现。
在db/memtable.h
中的具体实现为:
class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
// Increase reference count.
void Ref() { ++refs_; }
// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ LastSequence(); // 从当前的version中获取到最后使用的sequence
Writer* last_writer = &w; //本次写入的writer赋值给last_writer,注意为什么这里明明&w是队头却是lastwriter,是因为在后面的BuildBatchGroup方法中,会将本次批量写入的最后一个writer赋值给他
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 将当前队列中所有的writer里面的数据合并为一次写入
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);// 设置本次批量写入的sequence,sequence每次写入都是递增的,保证了写入的顺序,也能够进行读取的MVVC
last_sequence += WriteBatchInternal::Count(write_batch); // 更新当前的lastSequence,write_batch 中包含了当前数据的大大小
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock(); // 释放队列锁,此时可以继续写入writers队列了。但是由于当前的writer 没有从队头移除,所以此时仍然等待在 w.cv.Wait();中
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); // 写入Log文件
bool sync_error = false;
if (status.ok() && options.sync) { // 写入成功,是否同步刷盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_); // 此处写入mem
}
mutex_.Lock(); // 再次获取到锁,暂停线程写入writers,注意的是,这个锁的释放是等到本次线程退出方法,调用MutexLock的析构函数达到释放锁的目的
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear(); // 清理tmp_batch
versions_->SetLastSequence(last_sequence);// 设置sequence 到version中
}
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
} // 依次唤醒本次写入的后续writer,此时会从上面的while中继续调用,如果是头节点,而且已经被写入则直接返回,否则就继续执行上面的代码,该循环一直到本次写入的最后一个writer位置
// Notify new head of write queue
if (!writers_.empty()) { // 唤醒下一次调用
writers_.front()->cv.Signal();
}
return status;
}
算得上逐行解释了上面的代码。接下来看下如何使用一个互斥锁实现兼顾多线程顺序写入和效率的。
Add中的锁
先来看最开始的while循环中的cv.Wait()方法,在port/port_stdcxx.h
中,实现为:
void Wait() {
std::unique_lock lock(mu_->mu_, std::adopt_lock);//ustd::unique_lock能实现自动加锁与解锁功能,第一个参数是指的传入的参数进行上锁,如果有第二个参数即 std::adopt_lock:表示这个互斥量已经被lock()过了,无需在本次构造函数中加锁,否则会报错。
cv_.wait(lock); // wait 方法会对互斥锁解锁,然后阻塞等待,一直到被notify唤醒。唤醒之后会再次获取锁,一直到获取锁成功后才继续往下执行。
lock.release(); // 检测当前锁是否没有释放,如果是则释放掉
}
也就是说每次写入在Wait过程中,会释放当前获取的锁,允许后面的writer 写入到writers中。
下面举例说明:
如果当前有t1
,t2
,t3
,并发写入。假设初始阶段writers为空:
t1
,t2
,t3
写入的时候,首先假设t1
首先执行MutexLock l(&mutex ); 即获取到了mutex的锁,t1
被写入到writers中,而且是作为队头,所以不需要进入Wait状态,直接开始写,此时t2
,t3
在等待获取锁。t1
执行到了mutex_.Unlock();此时t1
的writer 还在队头,所以t2
,t3
被写入到writers中等待唤醒。t2
,t3
,但是t1
只会写自己的数据,因为他是在合并后才释放锁让t2
,t3
进入的.t1
先写入Log,然后写入MemTable,此时不存在并发写的情况,因为其他的并发都会被放到writers中。t1
将自己的数据写入到Log 和MemTable后,t1
再次获取到锁,此时阻塞writers的中继续新增writer。t1
的没有进行操作合并,所以他不会唤醒其他的写,只会唤醒下一次的队头,但是此时并没有释放锁。只是唤醒了t2
,t2
在等待t1
释放锁。t1
本次写入返回,方法栈中的MutexLock 被释放,然后t2
获取到锁,此时writers中的数据有t3
和t2
t2
的操作相比t1
多了一个合并数据和唤醒的动作。这里就不赘述了。上文中一个就涉及到1个锁,却能够有效的将数据的顺序和并发全部完成。使用一个队列,将多线程写入转换为单线程写入,保证了顺序,也有效地保证了效率。
校验空间容量
在写入之前,还需要看下当前的内存空间,和level0的文件数,实现就在db/db_impl.cc
的MakeRoomForWrite方法中:
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld(); // 确认当前的线程获取到了锁
assert(!writers_.empty()); // 有writer 操作
bool allow_delay = !force; // 是否运行缓冲,默认是1ms
Status s; // 返回的状态
while (true) {
if (!bg_error_.ok()) { // 这个bg_error是后台合并level0 的时候的一个操作
// Yield previous error
s = bg_error_;
break;
//1 如果允许等待(正常写入可以等待。force==updates==nullptr),并且当前的0层
// 文件触发了需要等待的条件(0 层文件大于等于8)
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();// 首先会释放锁,因为此时会等待操作进行完成,没必要不让后续的写入进入
env_->SleepForMicroseconds(1000);// 等到1ms
allow_delay = false; // Do not delay a single write more than once,每次写入最多运行等待一次
mutex_.Lock(); // 加锁,说明要开始干活了
//2 如果当前的内存足够,而且level0 的文件数量没有超过最大,说明有足够的内存和文件,直接返回stats
// write_buffer_size 大小为4MB,也就是说一个内存文件大小一般在大于4MB的时候就需要切换了
} else if (!force &&
(mem_->ApproximateMemoryUsage() NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
} else {
//5 如果当前的文件数量小于8,内存资源不够,而且没有进行合并,则说明需要创建一个新的内存文件
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
// 文件的名称也就是num 也是version提供的
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
// 创建可写文件,创建失败则说明岗前的num可以重复使用
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
// 释放当前的Log文件
delete log_;
// 关闭当前的Log文件
s = logfile_->Close();
if (!s.ok()) {
// We may have lost some data written to the previous log file.
// Switch to the new log file anyway, but record as a background
// error so we do not attempt any more writes.
//
// We could perhaps attempt to save the memtable corresponding
// to log file and suppress the error if that works, but that
// would add more complexity in a critical code path.
RecordBackgroundError(s);
}
// 释放内存
delete logfile_;
// 将上面创建的文件复制写Log,成为新的Log日志文件
logfile_ = lfile;
// 设置num
logfile_number_ = new_log_number;
// 将创建的文件赋值给Log中的writer
log_ = new log::Writer(lfile);
// 将当前mem_ 的指针复制给imm_,说明当前的mem已经准备刷到level0 了。
imm_ = mem_;
// 设置是has_imm_ 为true,这里的 memory_order_release 前面说过,就是不允许指令重排
has_imm_.store(true, std::memory_order_release);
// 创建新的MemTable,传入当前的比较器
mem_ = new MemTable(internal_comparator_);
// 给当前的mem 添加引用
mem_->Ref();
force = false; // Do not force another compaction if have room
//尝试调用后台合并
MaybeScheduleCompaction();
}
}
return s;
}
这里分配的空间核心还是内存是否超过4MB,以及当前level0 的数据是否超过配置的阈值。如果说调大上面的值肯定可以提高一定的吞吐量。但是后期合并的数据量也对应会增加,个人觉得如果key,value 都比较小,则4MB就足够了,但是如果每次都是超大的key和value,就可以考虑调大方法中的参数,避免频繁合并。
合并操作
合并操作个人觉得没有什么好了解的,里面有个小tips就是直接对writers的迭代iter++ 这样可以有效的避免其实只有一个还需要走下面的合并操作。
开始写入MemTable
在写入完Log 后就开始执行写入MemTable了。
status = WriteBatchInternal::InsertInto(write_batch, mem_);
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}
首先是创建了一个inserter的对象,赋值mem和sequence,然后调用WriteBatch的迭代器写入。这么封装有什么好处呢?为什么不在MemTable里做一个循环直接往里面写呢?个人觉得是为了解耦,MemTable插入的数据就是简单的Slice 对象,而不用去考虑里面的batch,通过迭代器解析然后插入,能够将WriteBatch的职责和MemTable的职责做一个很好的区分。
在正式进入迭代方法之前,先来看下此时一个的WriteBatch 数据结构。这里就不贴全部的源码了,只是说下WriteBatch里面主要是一个Slice的key和Slice 的value。这些值最后都会被挡在一个string类型名为rep_参数中。
每次写入都会将当前操作类型即ValueType放入到req中,写入之后的seq为:
如果是kTypeValue则数据是 [seq预留8字节,全部为0][count][kTypeValue(1字节)][key_length][key_value][value_length][value_value]
如果是kTypeDeletion 则数据为[seq预留8字节,全部为0][count][kTypeDeletion(1字节)][key_length][key_value]
然后在BuildBatchGroup 中也只是对这个值进行一个追加,最后是一个大的WriteBatch,包含了n个写入,这个在WriteBatch 中是一个append,每次都会将需要append的数据截取12字节后面的数据,然后将count重新设置到被append的count中。执行完BuildBatchGroup 后,就会在前面的4个字节中写入sequence。
结合Log来看,这里的key,value的值都是使用的Varint32位,这也是为什么需要在Log中写入Fragement的原因了。
了解到了WriteBatch 的数据结构就不在去看WriteBatch::Iterate里面的源码里,其实就是根据数据结果解析为key和value,然后分为delete方法或者Put方法。但是需要注意的是,WriteBatch::Iterate的里面的Put方法最后走到了class MemTableInserter : public WriteBatch::Handler
这个类里面的Put方法里。这个方法会对核心就是对当前批次写入的sequence进行解析和插入到MemTable中,说明虽然我们合并了数据的写入,但是在写入Mem中的时候sequence 还是按照写入顺序+1 的。
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
LevelDB里面的迭代器目前已经了解了两个了,一个是MemTable中的Iterator,用来查询数据,还有一个就是当前的写入了,其实迭代器在LevelDB中使用比较多,后面全部过一遍再回头来看迭代器的使用。
Add 方法
看到这里,终于开始往MemTable 写数据了。具体实现比较简单,主要就是Key值的创建也就是SkipList中Node 的创建。具体代码实现在db/mem_table.cc
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence