miniob源码 架构概览

2024年 5月 7日 72.9k 0

miniob源码 架构概览

整体架构

如下图,简单描述了,observer启动后,建立监听、注册libevent事件,recv后触发各stags的handle_event、处理结果回调、threadpool运行机制等等几个方面对整体线程模型、reactor模型和各组件工作流进行分析。

miniob源码 架构概览-1

Reactor模型

  • miniob运行框架是通过libevent实现了对网络事件的监听,当链接建立后,读缓冲区事件触发回调recv函数的处理流程。
event_set(&client_context->read_event, client_context->fd, EV_READ | EV_PERSIST, recv, client_context);

miniob源码 架构概览-2

  • 在recv函数的最后将SessionEvent添加到SessionStage的事件队列中,同时将SessionStage添加到threapool的Stage队列中。
  SessionEvent *sev = new SessionEvent(client);
  session_stage_->add_event(sev);
  • thread_pool的stage队列(thread_pool.h)
std::deque<Stage *> run_queue_;  //< list of stages with work to do
  • stage_event队列(stage.h)
std::deque<StageEvent *> event_list_;  // event queue

线程模型

miniob采用多线程的架构,通过Threadpool类创建线程池,根据etc/observer.ini中的配置创建线程。

# threadpools' name, it will contain the threadpool's section
ThreadPools=SQLThreads,IOThreads,DefaultThreads

[SQLThreads]
# the thread number of this threadpool, 0 means cpu's cores.
# if miss the setting of count, it will use cpu's core number;
count=3
#count=0

[IOThreads]
# the thread number of this threadpool, 0 means cpu's cores.
# if miss the setting of count, it will use cpu's core number;
count=3
#count=0

[DefaultThreads]
# If Stage haven't set threadpool, it will use this threadpool
# This threadpool is used for backend operation, such as timer, sedastats and so on.
# the thread number of this threadpool, 0 means cpu's cores.
# if miss the setting of count, it will use cpu's core number;
count=3

ThreadPools 配置了3个线程池:SQLThreads,IOThreads,DefaultThreads,每个线程池可以配置count,即线程的个数。

  • 创建线程池和线程,初始化时通过new ThreadPools,在构造函数中调用add_threads,根据线程个数的配置创建线程。
  // attempt to start the requested number of threads
  for (i = 0; i < threads; i++) {
    int stat = pthread_create(&pthread, &pthread_attrs, Threadpool::run_thread, (void *)this);
    if (stat != 0) {
      LOG_WARN("Failed to create one thread\n");
      break;
    }
    char tmp[32] = {};
    sprintf(tmp,"%s%u",name_.c_str(), i);
    pthread_setname_np(pthread, tmp);
  }

注意:8,9,10三行是我后加的代码,目的是在top的时候显示线程名称。

  • 在TimerStage初始化的时候bool TimerStage::initialize()函数也创建了一个线程。
bool TimerStage::initialize()
{
  // The TimerStage does not send messages to any other stage.
  ASSERT(next_stage_list_.size() == 0, "Invalid NextStages list.");

  // Start the thread to maintain the timer
  const pthread_attr_t *thread_attrs = NULL;
  void *thread_args = (void *)this;
  int status = pthread_create(&timer_thread_id_, thread_attrs, &TimerStage::start_timer_thread, thread_args);
  if (status != 0)
    LOG_ERROR("failed to create timer thread: status=%d\n", status);
  pthread_setname_np(timer_thread_id_, "TimerStage");
  
  return (status == 0);
}

注意:低12行是我后加的代码,目的是在top的时候显示线程名称。

  • 启动observer后可以通过top看到各线程情况
top -p `ps -ef | grep observer | grep -v grep | awk '{print $2}'` -H

miniob源码 架构概览-3

注意:可以看到observer的线程情况为,主线observer + SQLThreads(3个)+ IOThreads(3个)+ DefaultThreads(3个)+ TimerStage 共11 个线程

  • 初始化stage时,安装配置etc/observer.ini为每个stage分别了不同的线程池
[SessionStage]
ThreadId=SQLThreads
NextStages=PlanCacheStage

[PlanCacheStage]
ThreadId=SQLThreads
#NextStages=OptimizeStage
NextStages=ParseStage

[ParseStage]
ThreadId=SQLThreads
NextStages=ResolveStage

[ResolveStage]
ThreadId=SQLThreads
NextStages=QueryCacheStage

[QueryCacheStage]
ThreadId=SQLThreads
NextStages=OptimizeStage

[OptimizeStage]
ThreadId=SQLThreads
NextStages=ExecuteStage

[ExecuteStage]
ThreadId=SQLThreads
NextStages=DefaultStorageStage,MemStorageStage

[DefaultStorageStage]
ThreadId=IOThreads
BaseDir=./miniob
SystemDb=sys

[MemStorageStage]
ThreadId=IOThreads

[MetricsStage]
NextStages=TimerStage
  • bug修复,在分配线程池的时候代码有逻辑错误

miniob源码 架构概览-4

注意:低315行,317行为调整好的代码,建issue:stage线程分配问题 #75 ,PR:stage线程分配问题 #75 #76

Stage模型

可以把stage模型看做一个链,每个stage都是链上的节点,节点的入口是handle_event,每个handle_event都会调用下一个stage的handle_event。

miniob源码 架构概览-5

NextStages在配置文件中给出,但是实际上代码已经写的比较固化,比如支持配置多个NextStages,但成员变量中已经固定了一个或者两个

handle_event的参数是SQLStageEvent,SQLStageEvent中包括Stmt、Query、sql_等重要的数据结构

private:
  SessionEvent *session_event_ = nullptr;
  std::string sql_;
  Query *query_ = nullptr;
  Stmt *stmt_ = nullptr;

其中包含SessionEvent的原因是SessionEvent 中带有的回调函数可以向客户端返回应答。

private:
  ConnectionContext *client_;

  std::string response_;
  • SessionStage::callback_event回调函数,向客户端返回执行结果
void SessionStage::callback_event(StageEvent *event, CallbackContext *context)
{
  LOG_TRACE("Enter\n");

  SessionEvent *sev = dynamic_cast<SessionEvent *>(event);
  if (nullptr == sev) {
    LOG_ERROR("Cannot cat event to sessionEvent");
    return;
  }

  const char *response = sev->get_response();
  int len = sev->get_response_len();
  if (len <= 0 || response == nullptr) {
    response = "No data\n";
    len = strlen(response) + 1;
  }
  Server::send(sev->get_client(), response, len);
  if ('\0' != response[len - 1]) {
    // 这里强制性的给发送一个消息终结符,如果需要发送多条消息,需要调整
    char end = 0;
    Server::send(sev->get_client(), &end, 1);
  }

  // sev->done();
  LOG_TRACE("Exit\n");
  return;
}

注意:低17行向客户端发送处理结果response。

  • SessionEvent是在ParseStage阶段被调用的
void ParseStage::callback_event(StageEvent *event, CallbackContext *context)
{
  LOG_TRACE("Enter\n");
  SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
  sql_event->session_event()->done_immediate();
  sql_event->done_immediate();
  LOG_TRACE("Exit\n");
  return;
}

注意:第5行为回调SessionEvent,直接将结果返回客户端。

  • 词法解析、语法解析是在ParseStage的handle_event(request)进行的
RC ParseStage::handle_request(StageEvent *event)
{
  SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
  const std::string &sql = sql_event->sql();

  Query *query_result = query_create();
  if (nullptr == query_result) {
    LOG_ERROR("Failed to create query.");
    return RC::INTERNAL;
  }

  RC ret = parse(sql.c_str(), query_result);
  if (ret != RC::SUCCESS) {
    // set error information to event
    sql_event->session_event()->set_response("Failed to parse sql\n");
    query_destroy(query_result);
    return RC::INTERNAL;
  }

注意:第12行电源parse进行语法、词法解析,生成语法树Query *query_result。

总结

整体架构是基于Reactor事件驱动的异步消息处理模型,使用线程池,通过配置文件编排stage链,完成SQL处理流水线。当链接建立后,libevent的epool监听读缓冲区,收到可读event后,触发recv函数事件,recv将收到的数据和session信息打包成SessionEvent,并以SessionStage形式添加到thread_pool的stage队列中,同时触发SessionStage的handle_event,启动流水线链,然后按照流水线配置进行逐步处理,最后通过SessionStage的回调函数callback_event将处理结果发送至客户端。

整体架构应该比较清晰了,后续将逐步对各组件进行详细分析。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论