LevelDB 之MemTable

2023年 9月 6日 56.2k 0

前言

在 LevelDB 中,写完 WAL 日志以后,就可以将数据写入到 MemTable 了。MemTable 是 LSM-Tree必不可缺的一个组件,主要作用如下:

  • 写入的时候作为随机写转换为顺序写的buffer也是对数据进行排序的处理器
  • 读取的时候作为热点数据(刚写入的数据)的cache,加快读取速度
  • SSTable 的数据来源,初始的SSTable都是一个MemTable的持久化
  • 在具体的使用中,MemTable 需要在内存中开辟堆空间,所以需要内存管理。客户端一般写入MemTable后就可以返回成功。

    本文将针对MemTable 在 LevelDB 中的实现做一个简单的介绍,一起将客户端的写入过程也做了个介绍,会涉及到LevelDB如何控制并发等。

    MemTable的实现在db/memtable.hdb/memtable.cc。内存管理的是实现是在util/arena.h

    数据结构

    MemTable 数据结构就是前面提到过的SkipList。LevelDB 是将key和value组合在一起成为一个SkipList中的Node,所以MemTable中还包含了一个比较器。这里对SkipList不再做具体的介绍,如果想深入了解,可以看下我前面写的SkipList原理和Java实现。

    db/memtable.h中的具体实现为:

    class MemTable {
     public:
      // MemTables are reference counted.  The initial reference count
      // is zero and the caller must call Ref() at least once.
      explicit MemTable(const InternalKeyComparator& comparator);
    ​
      MemTable(const MemTable&) = delete;
      MemTable& operator=(const MemTable&) = delete;
    ​
      // Increase reference count.
      void Ref() { ++refs_; }
    ​
      // Drop reference count.  Delete if no more references exist.
      void Unref() {
        --refs_;
        assert(refs_ >= 0);
        if (refs_ LastSequence(); // 从当前的version中获取到最后使用的sequence
      Writer* last_writer = &w; //本次写入的writer赋值给last_writer,注意为什么这里明明&w是队头却是lastwriter,是因为在后面的BuildBatchGroup方法中,会将本次批量写入的最后一个writer赋值给他
      if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
        WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 将当前队列中所有的writer里面的数据合并为一次写入
        WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);// 设置本次批量写入的sequence,sequence每次写入都是递增的,保证了写入的顺序,也能够进行读取的MVVC
        last_sequence += WriteBatchInternal::Count(write_batch); // 更新当前的lastSequence,write_batch 中包含了当前数据的大大小
        // Add to log and apply to memtable.  We can release the lock
        // during this phase since &w is currently responsible for logging
        // and protects against concurrent loggers and concurrent writes
        // into mem_.
        {
          mutex_.Unlock(); // 释放队列锁,此时可以继续写入writers队列了。但是由于当前的writer 没有从队头移除,所以此时仍然等待在 w.cv.Wait();中
          status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); // 写入Log文件
          bool sync_error = false; 
          if (status.ok() && options.sync) { // 写入成功,是否同步刷盘
            status = logfile_->Sync();
            if (!status.ok()) {
              sync_error = true;
            }
          }
          if (status.ok()) {
            status = WriteBatchInternal::InsertInto(write_batch, mem_); // 此处写入mem
          }
          mutex_.Lock(); // 再次获取到锁,暂停线程写入writers,注意的是,这个锁的释放是等到本次线程退出方法,调用MutexLock的析构函数达到释放锁的目的
          if (sync_error) {
            // The state of the log file is indeterminate: the log record we
            // just added may or may not show up when the DB is re-opened.
            // So we force the DB into a mode where all future writes fail.
            RecordBackgroundError(status);
          }
        }
        if (write_batch == tmp_batch_) tmp_batch_->Clear(); // 清理tmp_batch
        versions_->SetLastSequence(last_sequence);// 设置sequence 到version中
      }
    ​
      while (true) {
        Writer* ready = writers_.front();
        writers_.pop_front();
        if (ready != &w) {
          ready->status = status;
          ready->done = true;
          ready->cv.Signal();
        }
        if (ready == last_writer) break;
      } // 依次唤醒本次写入的后续writer,此时会从上面的while中继续调用,如果是头节点,而且已经被写入则直接返回,否则就继续执行上面的代码,该循环一直到本次写入的最后一个writer位置
    ​
      // Notify new head of write queue
      if (!writers_.empty()) { // 唤醒下一次调用
        writers_.front()->cv.Signal();
      }
    ​
      return status;
    }
    

    算得上逐行解释了上面的代码。接下来看下如何使用一个互斥锁实现兼顾多线程顺序写入和效率的。

    Add中的锁

    先来看最开始的while循环中的cv.Wait()方法,在port/port_stdcxx.h中,实现为:

    void Wait() {
      std::unique_lock lock(mu_->mu_, std::adopt_lock);//ustd::unique_lock能实现自动加锁与解锁功能,第一个参数是指的传入的参数进行上锁,如果有第二个参数即 std::adopt_lock:表示这个互斥量已经被lock()过了,无需在本次构造函数中加锁,否则会报错。
    ​
      cv_.wait(lock); // wait 方法会对互斥锁解锁,然后阻塞等待,一直到被notify唤醒。唤醒之后会再次获取锁,一直到获取锁成功后才继续往下执行。 
      lock.release(); // 检测当前锁是否没有释放,如果是则释放掉
    }
    

    也就是说每次写入在Wait过程中,会释放当前获取的锁,允许后面的writer 写入到writers中。

    下面举例说明:

    如果当前有t1,t2,t3,并发写入。假设初始阶段writers为空:

  • t1,t2,t3 写入的时候,首先假设t1 首先执行MutexLock l(&mutex ); 即获取到了mutex的锁,t1 被写入到writers中,而且是作为队头,所以不需要进入Wait状态,直接开始写,此时t2,t3 在等待获取锁。
  • t1 执行到了mutex_.Unlock();此时t1的writer 还在队头,所以t2,t3 被写入到writers中等待唤醒。
  • 此时writers中包含了t2,t3,但是t1只会写自己的数据,因为他是在合并后才释放锁让t2,t3进入的.t1先写入Log,然后写入MemTable,此时不存在并发写的情况,因为其他的并发都会被放到writers中。
  • 等到t1将自己的数据写入到Log 和MemTable后,t1 再次获取到锁,此时阻塞writers的中继续新增writer。
  • 因为t1 的没有进行操作合并,所以他不会唤醒其他的写,只会唤醒下一次的队头,但是此时并没有释放锁。只是唤醒了t2,t2在等待t1释放锁。
  • t1 本次写入返回,方法栈中的MutexLock 被释放,然后t2获取到锁,此时writers中的数据有t3t2
  • t2 的操作相比t1 多了一个合并数据和唤醒的动作。这里就不赘述了。
  • 上文中一个就涉及到1个锁,却能够有效的将数据的顺序和并发全部完成。使用一个队列,将多线程写入转换为单线程写入,保证了顺序,也有效地保证了效率。

    校验空间容量

    在写入之前,还需要看下当前的内存空间,和level0的文件数,实现就在db/db_impl.cc的MakeRoomForWrite方法中:

    Status DBImpl::MakeRoomForWrite(bool force) {
      mutex_.AssertHeld(); // 确认当前的线程获取到了锁
      assert(!writers_.empty()); // 有writer 操作
      bool allow_delay = !force; // 是否运行缓冲,默认是1ms
      Status s; // 返回的状态
      while (true) {
        if (!bg_error_.ok()) { // 这个bg_error是后台合并level0 的时候的一个操作
          // Yield previous error
          s = bg_error_;
          break;
          //1 如果允许等待(正常写入可以等待。force==updates==nullptr),并且当前的0层
          // 文件触发了需要等待的条件(0 层文件大于等于8)
        } else if (allow_delay && versions_->NumLevelFiles(0) >=
                                      config::kL0_SlowdownWritesTrigger) {
          // We are getting close to hitting a hard limit on the number of
          // L0 files.  Rather than delaying a single write by several
          // seconds when we hit the hard limit, start delaying each
          // individual write by 1ms to reduce latency variance.  Also,
          // this delay hands over some CPU to the compaction thread in
          // case it is sharing the same core as the writer.
          mutex_.Unlock();// 首先会释放锁,因为此时会等待操作进行完成,没必要不让后续的写入进入
          env_->SleepForMicroseconds(1000);// 等到1ms
          allow_delay = false;  // Do not delay a single write more than once,每次写入最多运行等待一次
          mutex_.Lock(); // 加锁,说明要开始干活了
          //2 如果当前的内存足够,而且level0 的文件数量没有超过最大,说明有足够的内存和文件,直接返回stats
            // write_buffer_size 大小为4MB,也就是说一个内存文件大小一般在大于4MB的时候就需要切换了
        } else if (!force &&
                   (mem_->ApproximateMemoryUsage() NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
          // There are too many level-0 files.
          Log(options_.info_log, "Too many L0 files; waiting...\n");
          background_work_finished_signal_.Wait();
        } else {
          //5 如果当前的文件数量小于8,内存资源不够,而且没有进行合并,则说明需要创建一个新的内存文件
          // Attempt to switch to a new memtable and trigger compaction of old
          assert(versions_->PrevLogNumber() == 0);
           // 文件的名称也就是num 也是version提供的
          uint64_t new_log_number = versions_->NewFileNumber();
          WritableFile* lfile = nullptr;
            // 创建可写文件,创建失败则说明岗前的num可以重复使用
          s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
          if (!s.ok()) {
            // Avoid chewing through file number space in a tight loop.
            versions_->ReuseFileNumber(new_log_number);
            break;
          }
        // 释放当前的Log文件
          delete log_;
        // 关闭当前的Log文件
          s = logfile_->Close();
          if (!s.ok()) {
            // We may have lost some data written to the previous log file.
            // Switch to the new log file anyway, but record as a background
            // error so we do not attempt any more writes.
            //
            // We could perhaps attempt to save the memtable corresponding
            // to log file and suppress the error if that works, but that
            // would add more complexity in a critical code path.
            RecordBackgroundError(s);
          }
            // 释放内存
          delete logfile_;
        // 将上面创建的文件复制写Log,成为新的Log日志文件
          logfile_ = lfile;
            // 设置num
          logfile_number_ = new_log_number;
            // 将创建的文件赋值给Log中的writer
          log_ = new log::Writer(lfile);
          // 将当前mem_ 的指针复制给imm_,说明当前的mem已经准备刷到level0 了。
          imm_ = mem_;
           // 设置是has_imm_ 为true,这里的 memory_order_release 前面说过,就是不允许指令重排
          has_imm_.store(true, std::memory_order_release);
            // 创建新的MemTable,传入当前的比较器
          mem_ = new MemTable(internal_comparator_);
          // 给当前的mem 添加引用
          mem_->Ref();
          force = false;  // Do not force another compaction if have room
            //尝试调用后台合并
          MaybeScheduleCompaction();
        }
      }
      return s;
    }
    

    这里分配的空间核心还是内存是否超过4MB,以及当前level0 的数据是否超过配置的阈值。如果说调大上面的值肯定可以提高一定的吞吐量。但是后期合并的数据量也对应会增加,个人觉得如果key,value 都比较小,则4MB就足够了,但是如果每次都是超大的key和value,就可以考虑调大方法中的参数,避免频繁合并。

    合并操作

    合并操作个人觉得没有什么好了解的,里面有个小tips就是直接对writers的迭代iter++ 这样可以有效的避免其实只有一个还需要走下面的合并操作。

    开始写入MemTable

    在写入完Log 后就开始执行写入MemTable了。

     status = WriteBatchInternal::InsertInto(write_batch, mem_);
    Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
      MemTableInserter inserter;
      inserter.sequence_ = WriteBatchInternal::Sequence(b);
      inserter.mem_ = memtable;
      return b->Iterate(&inserter);
    }
    

    首先是创建了一个inserter的对象,赋值mem和sequence,然后调用WriteBatch的迭代器写入。这么封装有什么好处呢?为什么不在MemTable里做一个循环直接往里面写呢?个人觉得是为了解耦,MemTable插入的数据就是简单的Slice 对象,而不用去考虑里面的batch,通过迭代器解析然后插入,能够将WriteBatch的职责和MemTable的职责做一个很好的区分。

    在正式进入迭代方法之前,先来看下此时一个的WriteBatch 数据结构。这里就不贴全部的源码了,只是说下WriteBatch里面主要是一个Slice的key和Slice 的value。这些值最后都会被挡在一个string类型名为rep_参数中。

    每次写入都会将当前操作类型即ValueType放入到req中,写入之后的seq为:

    如果是kTypeValue则数据是 [seq预留8字节,全部为0][count][kTypeValue(1字节)][key_length][key_value][value_length][value_value]
    如果是kTypeDeletion 则数据为[seq预留8字节,全部为0][count][kTypeDeletion(1字节)][key_length][key_value]
    

    然后在BuildBatchGroup 中也只是对这个值进行一个追加,最后是一个大的WriteBatch,包含了n个写入,这个在WriteBatch 中是一个append,每次都会将需要append的数据截取12字节后面的数据,然后将count重新设置到被append的count中。执行完BuildBatchGroup 后,就会在前面的4个字节中写入sequence。

    结合Log来看,这里的key,value的值都是使用的Varint32位,这也是为什么需要在Log中写入Fragement的原因了。

    了解到了WriteBatch 的数据结构就不在去看WriteBatch::Iterate里面的源码里,其实就是根据数据结果解析为key和value,然后分为delete方法或者Put方法。但是需要注意的是,WriteBatch::Iterate的里面的Put方法最后走到了class MemTableInserter : public WriteBatch::Handler这个类里面的Put方法里。这个方法会对核心就是对当前批次写入的sequence进行解析和插入到MemTable中,说明虽然我们合并了数据的写入,但是在写入Mem中的时候sequence 还是按照写入顺序+1 的。

      void Put(const Slice& key, const Slice& value) override {
        mem_->Add(sequence_, kTypeValue, key, value);
        sequence_++;
      }
      void Delete(const Slice& key) override {
        mem_->Add(sequence_, kTypeDeletion, key, Slice());
        sequence_++;
      }
    

    LevelDB里面的迭代器目前已经了解了两个了,一个是MemTable中的Iterator,用来查询数据,还有一个就是当前的写入了,其实迭代器在LevelDB中使用比较多,后面全部过一遍再回头来看迭代器的使用。

    Add 方法

    看到这里,终于开始往MemTable 写数据了。具体实现比较简单,主要就是Key值的创建也就是SkipList中Node 的创建。具体代码实现在db/mem_table.cc

    void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
                      const Slice& value) {
     // Format of an entry is concatenation of:
     // key_size     : varint32 of internal_key.size()
     // key bytes   : char[internal_key.size()]
     // tag         : uint64((sequence

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论