如何新增系统变量?

2024年 5月 7日 80.3k 0

本专题的前几篇文章已经介绍过配置项和系统变量的基本用法,也对配置项的源码进行了解析,一些同学可能还想知道系统变量是如何实现的,以及想跟配置项一样自定义一个新的系统变量。

本文将通过探究“如何新增系统变量”这一问题,结合系统变量的源码实现,讲解系统变量的定义、初始化、内部访问和同步机制。

如何新增系统变量?

一个系统变量可以同时具有 global 级别和 session 级别的数据,在代码中存在多个副本。这与配置项(参数)不同,一个配置项要么是集群级别,要么是租户级别。

系统变量的定义

要增加一个新的变量,首先在 src/share/system_variable/ob_system_variable_init.json 文件中按照下面的格式编写一段代码。

  "ob_query_timeout": {
    "id": 10005,
    "name": "ob_query_timeout",
    "default_value": "10000000",
    "base_value": "10000000",
    "data_type": "int",
    "info": "Query timeout in microsecond(us)",
    "flags": "GLOBAL | SESSION | NEED_SERIALIZE",
    "on_check_and_convert_func": "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large",
    "publish_version": "",
    "info_cn": "",
    "background_cn": "",
    "ref_url": ""
  },

其中每一项的含义如下:

2参数 说明 举例
对象名(括号外) 数据结构的名字,与括号内的变量名一致 ob_query_timeout
id 变量的序号,按顺序手动分配 0, 1, 95, 10001, 10037
name 变量名,与括号外的对象名一致 ob_query_timeout
default_value 默认值 0, 1000, disabled, binary
base_value 基准值,初始与默认值相同,用于不同session间的变量同步 0, 1000, disabled, binary
data_type 数据类型 int, varchar, enum, bool
info 描述信息,描述变量的主要功能 "Query timeout in microsecond(us)"
flags 标签,包括级别、是否序列化、是否可见等 GLOBAL | SESSION | NEED_SERIALIZE | ORACLE_ONLY | INVISIBLE
on_check_and_convert_func 合法性检查和格式转换函数(可选) ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large
min_val 最小值(可选) 1
max_val 最大值(可选) 10000
enum_names 枚举类型的取值列表(可选) "enum_names": ["OFF", "AUTO", "FORCE"],
publish_version 发布版本号 "227", "420"(即2.2.7,4.2.0)
info_cn 中文描述 "查询超时时间(微秒)"
background_cn 添加原因/背景 "避免死锁、一直重试等问题导致的sql阻塞的情况"
ref_url 变量申请链接 "xxx"

然后执行 ./gen_ob_sys_variables.py 脚本,新增的变量会出现在 ob_system_variable_init.cpp 文件中。

[&] (){
      ObSysVars[100].default_value_ = "10000000" ;
      ObSysVars[100].info_ = "Query timeout in microsecond(us)" ;
      ObSysVars[100].name_ = "ob_query_timeout" ;
      ObSysVars[100].data_type_ = ObIntType ;
      ObSysVars[100].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ;
      ObSysVars[100].on_check_and_convert_func_ = "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large" ;
      ObSysVars[100].id_ = SYS_VAR_OB_QUERY_TIMEOUT ;
      cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR_OB_QUERY_TIMEOUT)) ;
      ObSysVarsIdToArrayIdx[SYS_VAR_OB_QUERY_TIMEOUT] = 100 ;
      ObSysVars[100].base_value_ = "10000000" ;
    ObSysVars[100].alias_ = "OB_SV_QUERY_TIMEOUT" ;
    }();

这些变量的初始值都会存储在以下数组中。其中 ObSysVars 数组保存了从 json 文件生成的最原始的变量信息;ObSysVarDefaultValues 数组存储了系统变量的默认值;ObSysVarBaseValues 数组存储了系统变量的基准值。每新增一个系统变量,这些数组中就会多一个元素。

namespace oceanbase
{
namespace share
{
// 变量的初始信息
static ObSysVarFromJson ObSysVars[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的默认值
static ObObj ObSysVarDefaultValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的基准值
static ObObj ObSysVarBaseValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];

系统变量缓存的定义

在某些场景为满足功能或性能的需求,需要额外为系统变量增加一份缓存(按需定义,不是必须的)。系统变量的缓存通过 DEF_SYS_VAR_CACHE_FUNCS 和 DEF_SYS_VAR_CACHE_FUNCS_STR 宏定义。

class ObBasicSessionInfo
{
private:
#define DEF_SYS_VAR_CACHE_FUNCS(SYS_VAR_TYPE, SYS_VAR_NAME)                           \
  void set_##SYS_VAR_NAME(SYS_VAR_TYPE value)                                         \
  {                                                                                   \
    inc_data_.SYS_VAR_NAME##_ = (value);                                              \
    inc_##SYS_VAR_NAME##_ = true;                                                     \
  }                                                                                   \
  void set_base_##SYS_VAR_NAME(SYS_VAR_TYPE value)                                    \
  {                                                                                   \
    base_data_.SYS_VAR_NAME##_ = (value);                                             \
  }                                                                                   \
  const SYS_VAR_TYPE &get_##SYS_VAR_NAME() const                                      \
  {                                                                                   \
    return get_##SYS_VAR_NAME(inc_##SYS_VAR_NAME##_);                                 \
  }                                                                                   \
  const SYS_VAR_TYPE &get_##SYS_VAR_NAME(bool is_inc) const                           \
  {                                                                                   \
    return is_inc ? inc_data_.SYS_VAR_NAME##_ : base_data_.SYS_VAR_NAME##_;           \
  }

  class SysVarsCache
  {
  public:
    DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_max_read_stale_time);
    DEF_SYS_VAR_CACHE_FUNCS(bool, tx_read_only);
    DEF_SYS_VAR_CACHE_FUNCS_STR(nls_date_format);
  }

  private:
    static SysVarsCacheData base_data_;
    SysVarsCacheData inc_data_;
    union {
      uint64_t inc_flags_;
      struct {
        bool inc_auto_increment_increment_:1;
        bool inc_sql_throttle_current_priority_:1;
        ......

其中 base_data_ 表示基准数据,它是全局唯一的,与“global变量”对应;inc_data_ 表示最新数据,每个session各有一份,与“session变量”对应。

以 ob_max_read_stale_time 变量为例,上面的宏展开后生成以下代码。

  void set_ob_max_read_stale_time(int64_t value)
  {
    inc_data_.ob_max_read_stale_time_ = (value);
    inc_ob_max_read_stale_time_ = true;
  }
  void set_base_ob_max_read_stale_time(int64_t value)
  {
    base_data_.ob_max_read_stale_time_ = (value);
  }
  const int64_t &get_ob_max_read_stale_time() const
  {
    return get_ob_max_read_stale_time(inc_ob_max_read_stale_time_);
  }
  const int64_t &get_ob_max_read_stale_time(bool is_inc) const
  {
    return is_inc ? inc_data_.ob_max_read_stale_time_ : base_data_.ob_max_read_stale_time_;
  }

其中 set_ob_max_read_stale_time() 函数会修改 inc_ob_max_read_stale_time_ 为 true 并更新 inc_data_;然后调用 get_ob_max_read_stale_time() 函数会返回修改过的数据 inc_data_.ob_max_read_stale_time_,未修改则会返回基准数据 base_data_.ob_max_read_stale_time_。

变量初始化

定义上面的一系列数据结构后,接下来在 OBServer 启动时系统变量会进行初始化。

变量基础数据

ObSysVars 数组保存了最原始的变量信息,在静态对象的构造函数中进行初始化。

namespace oceanbase
{
namespace share
{
// 变量的初始信息
static ObSysVarFromJson ObSysVars[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的默认值
static ObObj ObSysVarDefaultValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的基准值
static ObObj ObSysVarBaseValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];

static struct VarsInit{
  VarsInit(){
    // 保存当前系统变量的最大的id
    int64_t cur_max_var_id = 0;
    // ObSysVarsIdToArrayIdx数组默认初始值为-1,-1表示无效索引
    memset(ObSysVarsIdToArrayIdx, -1, sizeof(ObSysVarsIdToArrayIdx));
    [&] (){
      ObSysVars[0].default_value_ = "1" ;
      ObSysVars[0].info_ = "" ;
      ObSysVars[0].name_ = "auto_increment_increment" ;
      ObSysVars[0].data_type_ = ObUInt64Type ;
      ObSysVars[0].min_val_ = "1" ;
      ObSysVars[0].max_val_ = "65535" ;
      ObSysVars[0].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ;
      ObSysVars[0].id_ = SYS_VAR_AUTO_INCREMENT_INCREMENT ;
      cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR_AUTO_INCREMENT_INCREMENT)) ;
      ObSysVarsIdToArrayIdx[SYS_VAR_AUTO_INCREMENT_INCREMENT] = 0 ;
      ObSysVars[0].base_value_ = "1" ;
    ObSysVars[0].alias_ = "OB_SV_AUTO_INCREMENT_INCREMENT" ;
    }();

ObSysVarDefaultValues 数组保存了系统变量的默认值,ObSysVarBaseValues 数组保存了系统变量的基准值。在 OBServer 启动时,会初始化这两个数组:

  • ObServer::init()
  • ObPreProcessSysVars::init_sys_var()
  • ObSysVariables::init_default_values()

该函数遍历 ObSysVars 数组,将每个变量的 default_value_ 存储到 ObSysVarDefaultValues 数组中,将每个变量的 base_value_ 存储到 ObSysVarBaseValues 数组中。

int ObSysVariables::init_default_values()
{
  int ret = OB_SUCCESS;
  int64_t sys_var_count = get_amount();
  for (int64_t i = 0; OB_SUCC(ret) && i < sys_var_count; ++i) {
    const ObString &sys_var_val_str = ObSysVariables::get_value(i);
    const ObString &base_sys_var_val_str = ObSysVariables::get_base_str_value(i);
      ......
        ObSysVarDefaultValues[i] = out_obj;
        ObSysVarBaseValues[i] = base_out_obj;

global 变量:sysvar_array_

ObSysVariableSchema 是管理系统变量的 schema 对象,其中的 sysvar_array_ 数组存储了 global 变量的值,因此系统变量也具有 schema 多版本的能力。

class ObSysVariableSchema : public ObSchema
{
private:
  uint64_t tenant_id_;
  int64_t schema_version_;
  ObSysVarSchema *sysvar_array_[ObSysVarFactory::ALL_SYS_VARS_COUNT];
  bool read_only_;

class ObSysVarSchema : public ObSchema
{
private:
  uint64_t tenant_id_;
  common::ObString name_;
  common::ObObjType data_type_;
  common::ObString value_;
  common::ObString min_val_;
  common::ObString max_val_;
  common::ObString info_;
  common::ObZone zone_;
  int64_t schema_version_;
  int64_t flags_;

sysvar_array_ 数组在 OBServer 启动时进行初始化,首先调用的是 init_schema() 函数。

  • ObServer::init_schema()
  • ObMultiVersionSchemaService::init()
  • ObMultiVersionSchemaService::init_sys_tenant_user_schema()

先调用 load_default_system_variable() 将所有变量的默认值都加载到 sysvar_array_ 数组中,再调用 put_schema() 把这个 schema 添加到 schema_cache_ 中。

int ObMultiVersionSchemaService::init_sys_tenant_user_schema()
{
  ObSysVariableSchema sys_variable;
    
  if (OB_FAIL(sys_variable.load_default_system_variable(true))) {
    LOG_WARN("load sys tenant default system variable failed", K(ret));

  } else if (OB_FAIL(schema_cache_.put_schema(SYS_VARIABLE_SCHEMA,
                                              OB_SYS_TENANT_ID,
                                              sys_variable.get_tenant_id(),
                                              sys_variable.get_schema_version(),
                                              sys_variable))) {
  • ObSysVariableSchema::load_default_system_variable()

该函数遍历 ObSysVars 数组获取变量的默认值,然后调用 add_sysvar_schema() 函数将变量值添加到 sysvar_array_ 数组中。

int ObSysVariableSchema::load_default_system_variable(bool is_sys_tenant)
{
  int ret = OB_SUCCESS;
  ObString value;
  ObSysVarSchema sysvar;
  for (int64_t i = 0; OB_SUCC(ret) && i < ObSysVariables::get_amount(); ++i) {
    sysvar.reset();
    // 从 ObSysVariables 中获取默认数据
    if (is_sys_tenant && ObCharset::case_insensitive_equal(ObSysVariables::get_name(i), OB_SV_LOWER_CASE_TABLE_NAMES)) {
      value = ObString::make_string("2");
    } else {
      value = ObSysVariables::get_value(i);
    }
    
    } else if (OB_FAIL(sysvar.set_value(value))) {
      LOG_WARN("set sysvar value failed", K(ret), K(value));
    } else if (OB_FAIL(add_sysvar_schema(sysvar))) {
      LOG_WARN("add sysvar schema failed", K(ret));
    }
  }

session 变量:sys_vars_

ObBasicSessionInfo 是管理 session 相关信息的对象,其中的 sys_vars_ 数组存储了 session 变量的值,在建立 session 连接时进行初始化。

class ObBasicSessionInfo
{
private:
  share::ObBasicSysVar *sys_vars_[share::ObSysVarFactory::ALL_SYS_VARS_COUNT];

外部 session

外部 session 连接是指从客户端发起的连接,一般用于处理用户的 sql 请求。外部 session 的变量缓存值在初始化时,会从 schema 中取值,同时也依赖 schema 的同步机制。

  • int ObMPConnect::process()

在建立连接时,先调用 create_session() 函数创建 session,然后调用 verify_identify() 初始化变量。

  • ObMPConnect::verify_identify()
  • ObMPConnect::load_privilege_info()

通过租户ID从 schema_cache_ 中获取对应的 sys_variable_schema,然后将 sys_variable_schema 作为参数调用 load_all_sys_vars() 函数。

int ObMPConnect::load_privilege_info(ObSQLSessionInfo &session)
{

      const ObSysVariableSchema *sys_variable_schema = NULL;
    
      } else if (OB_FAIL(schema_guard.get_sys_variable_schema(conn->tenant_id_, sys_variable_schema))) {
        LOG_WARN("get sys variable schema failed", K(ret));
      } else if (OB_ISNULL(sys_variable_schema)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("sys variable schema is null", K(ret));
      } else if (OB_FAIL(session.init_tenant(tenant_name_, conn->tenant_id_))) {
          LOG_WARN("failed to init_tenant", K(ret));
      } else if (OB_FAIL(session.load_all_sys_vars(*sys_variable_schema, false))) {
        LOG_WARN("load system variables failed", K(ret));
  • ObBasicSessionInfo::load_all_sys_vars()

该函数每次循环,先从 ObSysVars 数组中获取 sys_var_id,然后从传入的 sys_var_schema 中获取 sys_var 对象,然后把变量的 name 和 value 作为参数,调用 load_sys_variable() 函数。

int ObBasicSessionInfo::load_all_sys_vars(const ObSysVariableSchema &sys_var_schema, bool sys_var_created)
{
  int ret = OB_SUCCESS;
  OZ (clean_all_sys_vars());
  if (!sys_var_created) {
    OZ (sys_var_fac_.create_all_sys_vars());
  }
  OX (influence_plan_var_indexs_.reset());
  ObArenaAllocator calc_buf(ObModIds::OB_SQL_SESSION);
  for (int64_t i = 0; OB_SUCC(ret) && i < get_sys_var_count(); i++) {
    ObSysVarClassType sys_var_id = ObSysVariables::get_sys_var_id(i);
    const ObSysVarSchema *sys_var = NULL;
    OZ (sys_var_schema.get_sysvar_schema(sys_var_id, sys_var), sys_var_id, i);
    OV (OB_NOT_NULL(sys_var));
    OZ (load_sys_variable(calc_buf, sys_var->get_name(), sys_var->get_data_type(),
                          sys_var->get_value(), sys_var->get_min_val(),
                          sys_var->get_max_val(), sys_var->get_flags(), true));
  • ObBasicSessionInfo::load_sys_variable()

先调用 find_sys_var_id_by_name() 函数通过 name 获取 var_id,然后调用 create_sys_var() 函数获取(创建)变量指针 sys_var,再调用 sys_var->init() 给变量赋值,最后调用 process_session_variable() 函数给变量缓存赋值。

int ObBasicSessionInfo::load_sys_variable(ObIAllocator &calc_buf,
                                          const ObString &name,
                                          const ObObj &type,
                                          const ObObj &value,
                                          const ObObj &min_val,
                                          const ObObj &max_val,
                                          const int64_t flags,
                                          bool is_from_sys_table)
{

  if (SYS_VAR_INVALID == (var_id = ObSysVarFactory::find_sys_var_id_by_name(name, is_from_sys_table))) {
  ......
  } else if (OB_FAIL(create_sys_var(var_id, store_idx, sys_var))) {
    LOG_WARN("fail to create sys var", K(name), K(value), K(ret));
  ......
  } else if (OB_FAIL(sys_var->init(real_val, min_ptr, max_ptr, val_type.get_type(), flags))) {
    LOG_WARN("fail to init sys var", K(ret), K(sys_var->get_type()),
             K(real_val), K(name), K(value));
  } else if (OB_FAIL(process_session_variable(var_id, real_val,
                                              false /*check_timezone_valid*/,
                                              false /*is_update_sys_var*/))) {
  • ObBasicSessionInfo::create_sys_var()

初次调用 create_sys_var() 函数时,sys_vars_[store_idx] 为空,然后调用 sys_var_fac_.create_sys_var() 函数生成 sys_var,再把 sys_var 赋给 sys_vars_[store_idx]。session 刚初始化时,sys_var 指向的是变量的默认值。

再次调用 create_sys_var() 函数时,sys_vars_[store_idx] 不为空,将其值赋给 sys_var,相当于获取了当前session 的变量。

int ObBasicSessionInfo::create_sys_var(ObSysVarClassType sys_var_id,
                                       int64_t store_idx, ObBasicSysVar *&sys_var)
{
  int ret = OB_SUCCESS;
  OV (0 <= store_idx && store_idx < ObSysVarFactory::ALL_SYS_VARS_COUNT,
      OB_ERR_UNEXPECTED, sys_var_id, store_idx);
  if (OB_NOT_NULL(sys_vars_[store_idx])) {
    OV (sys_vars_[store_idx]->get_type() == sys_var_id,
        OB_ERR_UNEXPECTED, sys_var_id, store_idx, sys_vars_[store_idx]->get_type());
    OX (sys_var = sys_vars_[store_idx]);
  } else {
    OZ (sys_var_fac_.create_sys_var(sys_var_id, sys_var), sys_var_id);
    OV (OB_NOT_NULL(sys_var), OB_ERR_UNEXPECTED, sys_var_id, store_idx);
    OX (sys_vars_[store_idx] = sys_var);
  }
  return ret;
}

内部 session

内部 session 连接是指 OBServer 内部或 OBProxy 向 OBServer 发起的内部请求建立的连接,不是用户主动建立的连接。内部 session 的变量总是获取全局默认值作为初值,而不是从最新的 schema 中取值。

  • ObCommonSqlProxy::acquire()
  • ObInnerSQLConnectionPool::acquire()
  • ObInnerSQLConnection::init()
  • ObInnerSQLConnection::init_session()
  • ObInnerSQLConnection::init_session_info()
  • ObBasicSessionInfo::load_default_sys_variable()
int ObBasicSessionInfo::load_default_sys_variable(const bool print_info_log, const bool is_sys_tenant, bool is_deserialized)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(sys_var_fac_.create_all_sys_vars())) {
    LOG_WARN("fail create all sys variables", K(ret));
  } else if (OB_FAIL(init_system_variables(print_info_log, is_sys_tenant, is_deserialized))) {
    LOG_WARN("Init system variables failed !", K(ret));
  }
  return ret;
}
  • ObBasicSessionInfo::init_system_variables()

调用 ObSysVariables::get_value(i) 获取 value,也就是从全局的 ObSysVars 数组中获取默认值,然后调用 load_sys_variable() 函数将 value 的值加载到 session 中。

int ObBasicSessionInfo::init_system_variables(const bool print_info_log, const bool is_sys_tenant,
                                              bool is_deserialized)
{

  for (int64_t i = 0; OB_SUCC(ret) && i < var_amount; ++i) {
    name.assign_ptr(const_cast<char*>(ObSysVariables::get_name(i).ptr()),
                    static_cast<ObString::obstr_size_t>(strlen(ObSysVariables::get_name(i).ptr())));
    bool is_exist = false;
    if (OB_FAIL(sys_variable_exists(name, is_exist))) {
      LOG_WARN("failed to check if sys variable exists", K(name), K(ret));
    } else if (!is_exist) {
      // Note: 如果已经初始化过 base value,则下面的流程不会执行
      var_type = ObSysVariables::get_type(i);
      var_flag = ObSysVariables::get_flags(i);
      value.set_varchar(is_deserialized ? ObSysVariables::get_base_str_value(i) :ObSysVariables::get_value(i));
      value.set_collation_type(ObCharset::get_system_collation());
      min_val.set_varchar(ObSysVariables::get_min(i));
      min_val.set_collation_type(ObCharset::get_system_collation());
      max_val.set_varchar(ObSysVariables::get_max(i));
      max_val.set_collation_type(ObCharset::get_system_collation());
      type.set_type(var_type);
      if(is_sys_tenant) {
        if (OB_FAIL(process_variable_for_tenant(name, value))) {
          LOG_WARN("process system variable for tenant error",  K(name), K(value), K(ret));
        }
      }
      if (OB_SUCC(ret)) {
        if (OB_FAIL(load_sys_variable(calc_buf, name, type, value, min_val, max_val, var_flag, false))) {
  • ObBasicSessionInfo::load_sys_variable()

初次调用 create_sys_var() 函数时,创建 session 变量 sys_var,然后用传入的默认值 value 对其进行初始化。

int ObBasicSessionInfo::load_sys_variable(ObIAllocator &calc_buf,
                                          const ObString &name,
                                          const ObObj &type,
                                          const ObObj &value,
                                          const ObObj &min_val,
                                          const ObObj &max_val,
                                          const int64_t flags,
                                          bool is_from_sys_table)
{
    
  } else if (OB_FAIL(create_sys_var(var_id, store_idx, sys_var))) {
    LOG_WARN("fail to create sys var", K(name), K(value), K(ret));
    
  } else if (OB_FAIL(ObBasicSessionInfo::change_value_for_special_sys_var(
              var_id, val_ptr, real_val))) {
    LOG_WARN("fail to change value for special sys var", K(ret), K(var_id), K(val_ptr));
  } else if (OB_FAIL(sys_var->init(real_val, min_ptr, max_ptr, val_type.get_type(), flags))) {
    LOG_WARN("fail to init sys var", K(ret), K(sys_var->get_type()),
             K(real_val), K(name), K(value));
  } else if (OB_FAIL(process_session_variable(var_id, real_val,
                                              false /*check_timezone_valid*/,
                                              false /*is_update_sys_var*/))) {
    LOG_WARN("process system variable error",  K(name), K(type), K(real_val), K(value), K(ret));

变量缓存基准值:base_data_

base_data_ 表示系统变量缓存的基准数据,它是 OBServer 全局唯一的。

  • ObServer::init()
  • ObBasicSessionInfo::init_sys_vars_cache_base_values()

遍历 ObSysVarDefaultValues 数组,将变量的默认值写到 ObBasicSessionInfo::base_data_ 中,这是一个 static SysVarsCacheData[] 类型的数组。

int ObBasicSessionInfo::init_sys_vars_cache_base_values()
{
  int ret = OB_SUCCESS;
  int64_t store_idx = OB_INVALID_INDEX_INT64;
  ObBasicSessionInfo::SysVarsCache sys_vars_cache;
  int64_t var_amount = ObSysVariables::get_amount();
  for (int64_t i = 0; OB_SUCC(ret) && i < var_amount; ++i) {
    store_idx = ObSysVarsToIdxMap::get_store_idx((int64_t)ObSysVariables::get_sys_var_id(i));
    OX (fill_sys_vars_cache_base_value(
            ObSysVariables::get_sys_var_id(i),
            sys_vars_cache,
            ObSysVariables::get_default_value(store_idx) ));
  }
  return ret;
}

变量缓存增量值:inc_data_

inc_data_ 表示 session 变量的缓存值,每个 session 各有一份缓存数据。

外部session 的 inc_data_ 变量缓存值初始化为 schema 中的最新值,内部 session 的 inc_data_ 初始化为变量的默认值,也就是说变量缓存与 session 变量的初值是一致的,它们初始化的调用路径也基本相同。

变量查询

外部查询

查询 global 变量

global 变量的值是从 schema 中获取的,在执行如“show global variables like xxx”的命令时就会触发以下流程:

  • ObBasicSessionInfo::get_global_sys_variable()

先调用 get_sys_variable_schema() 函数获取 sys_variable_schema 对象,然后从该对象中获取 sysvar_schema。

int ObBasicSessionInfo::get_global_sys_variable(const uint64_t actual_tenant_id, // 为了处理租户已经切掉的情况
                                                ObIAllocator &calc_buf,
                                                const ObDataTypeCastParams &dtc_params,
                                                const ObString &var_name,
                                                ObObj &val)
{
    
  } else if (OB_FAIL(schema_guard.get_sys_variable_schema(actual_tenant_id, sys_variable_schema))) {
    LOG_WARN("get sys variable schema failed", K(ret));
  } else if (OB_ISNULL(sys_variable_schema)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("sys variable schema is null", K(ret));
  } else if (OB_FAIL(sys_variable_schema->get_sysvar_schema(var_name, sysvar_schema))) {
  • ObSysVariableSchema::get_sysvar_schema(const ObString &name, const ObSysVarSchema *&sysvar_schema)
  • ObSysVariableSchema::get_sysvar_schema(ObSysVarClassType var_id, const ObSysVarSchema *&sysvar_schema)
  • ObSysVariableSchema::get_sysvar_schema(int64_t idx)

该函数根据传入的 idx 获取对应的变量,最终返回 sysvar_array_[idx]。

const ObSysVarSchema *ObSysVariableSchema::get_sysvar_schema(int64_t idx) const
{
  const ObSysVarSchema *ret = NULL;
  if (idx >= 0 && idx < get_sysvar_count()) {
    ret = sysvar_array_[idx];
  }
  return ret;
}

查询 session 变量

session 变量的值是从 session 本地数组 sys_vars_ 中获取的,执行如“show variables like xxx”的命令时会触发以下调用流程:

  • ObResultSet::inner_get_next_row()
  • ......
  • ObBasicSessionInfo::get_sys_variable_by_name()
  • ObBasicSessionInfo::inner_get_sys_var(oceanbase::common::ObString const&, oceanbase::share::ObBasicSysVar*&)
  • ObBasicSessionInfo::inner_get_sys_var(oceanbase::common::ObString const&, long&, oceanbase::share::ObBasicSysVar*&)

该函数先调用 find_sys_var_id_by_name() 函数通过 sys_var_name 获取 sys_var_id,然后调用 calc_sys_var_store_idx() 函数利用 sys_var_id 计算出 store_idx,也就是变量在数组中的下标,最后返回 sys_vars_[store_idx] 即可。

int ObBasicSessionInfo::inner_get_sys_var(const ObString &sys_var_name,
                                          int64_t &store_idx,
                                          ObBasicSysVar *&sys_var) const
{
  int ret = OB_SUCCESS;
  ObSysVarClassType sys_var_id = SYS_VAR_INVALID;
  if (OB_UNLIKELY(SYS_VAR_INVALID == (
              sys_var_id = ObSysVarFactory::find_sys_var_id_by_name(sys_var_name)))) {
    ret = OB_ERR_SYS_VARIABLE_UNKNOWN;
    LOG_WARN("fail to find sys var id by name", K(ret), K(sys_var_name), K(lbt()));
  } else if (OB_FAIL(ObSysVarFactory::calc_sys_var_store_idx(sys_var_id, store_idx))) {
    LOG_WARN("fail to calc sys var store idx", K(ret), K(sys_var_id), K(sys_var_name), K(lbt()));
  } else if (OB_UNLIKELY(store_idx < 0) ||
             OB_UNLIKELY(store_idx >= ObSysVarFactory::ALL_SYS_VARS_COUNT)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_ERROR("got store_idx is invalid", K(ret), K(store_idx));
  } else if (OB_ISNULL(sys_vars_[store_idx])) {
    ret = OB_ENTRY_NOT_EXIST;
    LOG_WARN("sys var is NULL", K(ret), K(store_idx), K(sys_var_name));
  } else {
    sys_var = sys_vars_[store_idx];
  }
  return ret;
}

内部获取

获取 global 变量

内部访问 global 变量同样是通过 schema 获取,先从全局的 GCTX.schema_service_中获取 schema_guard,然后通过变量名获取 var_schema,再逐步解析得到最终的基础数据类型。

例如获取 weak_read_version_refresh_interval:

int ObRootService::check_weak_read_version_refresh_interval(int64_t refresh_interval, bool &valid)
{
    ......
      } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
        LOG_WARN("get schema guard failed", KR(ret), K(tenant_id));
      } else if (OB_FAIL(schema_guard.get_tenant_system_variable(tenant_id,
                         OB_SV_MAX_READ_STALE_TIME, var_schema))) {
        LOG_WARN("get tenant system variable failed", KR(ret), K(tenant_id));
      } else if (OB_ISNULL(var_schema)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("var schema is null", KR(ret), K(tenant_id));
      } else if (OB_FAIL(var_schema->get_value(NULL, NULL, obj))) {
        LOG_WARN("get value failed", KR(ret), K(tenant_id), K(obj));
      } else if (OB_FAIL(obj.get_int(session_max_stale_time))) {
        LOG_WARN("get int failed", KR(ret), K(tenant_id), K(obj));
      } else if (session_max_stale_time != share::ObSysVarFactory::INVALID_MAX_READ_STALE_TIME
                 && refresh_interval > session_max_stale_time) {

通过这种方式获取变量,通常会定义一个变量别名,方便代码中进行访问。

namespace oceanbase
{
namespace share
{
  static const char* const OB_SV_ENABLE_RICH_ERROR_MSG = "ob_enable_rich_error_msg";
  static const char* const OB_SV_LOG_ROW_VALUE_OPTIONS = "log_row_value_options";
  static const char* const OB_SV_MAX_READ_STALE_TIME = "ob_max_read_stale_time";
  ......

获取 session 变量最新值

内部查询调用 get_sys_var() 函数通过下标 idx 获取 session 变量的值。

ObBasicSysVar *ObBasicSessionInfo::get_sys_var(const int64_t idx)
{
  ObBasicSysVar *var = NULL;
  if (idx >= 0 && idx < ObSysVarFactory::ALL_SYS_VARS_COUNT) {
    var = sys_vars_[idx];
  }
  return var;
}

获取 session 变量缓存值

获取变量缓存值,就是从 ObBasicSessionInfo::sys_vars_cache_数组中取值,通常是封装了一些函数,然后再调用 sys_vars_cache_ 的 get_xxx() 接口,这些接口就是用 DEF_SYS_VAR_CACHE_FUNCS 宏定义的。

举几个例子:

class ObBasicSessionInfo
{
  int64_t get_trx_lock_timeout() const
  {
    return sys_vars_cache_.get_ob_trx_lock_timeout();
  }
  int64_t get_ob_max_read_stale_time() {
    return sys_vars_cache_.get_ob_max_read_stale_time();
  }
  int get_sql_throttle_current_priority(int64_t &sql_throttle_current_priority)
  {
    sql_throttle_current_priority = sys_vars_cache_.get_sql_throttle_current_priority();
    return common::OB_SUCCESS;
  }

只要能够访问 session,就可以访问 session 中的变量,比如获取 ob_max_read_stale_time:

session->get_ob_max_read_stale_time();

变量更新

外部修改

global 变量修改流程

更新 global 变量就是更新其对应的 schema,执行如“set global xxx = xx”的命令时触发以下调用流程:

  • ObMPQuery::do_process()
  • ObVariableSetExecutor::execute(ObExecContext &ctx, ObVariableSetStmt &stmt)
  • ObVariableSetExecutor::update_global_variables()

该函数先对更新数据做合法性检查,然后构造一个新的 sysvar_schema 加入到 rpc 参数中,最后发送更新系统变量的请求到其他节点。

int ObVariableSetExecutor::update_global_variables(ObExecContext &ctx,
                                                   ObDDLStmt &stmt,
                                                   const ObSetVar &set_var,
                                                   const ObObj &val)
{
	// 合法性检查
    } else if (set_var.var_name_ == OB_SV_MAX_READ_STALE_TIME) {
      int64_t max_read_stale_time = 0;
      if (OB_FAIL(val.get_int(max_read_stale_time))) {
        LOG_WARN("fail to get int value", K(ret), K(val));
      } else if (max_read_stale_time != ObSysVarFactory::INVALID_MAX_READ_STALE_TIME &&
                 max_read_stale_time < GCONF.weak_read_version_refresh_interval) {
        ret = OB_INVALID_ARGUMENT;
        LOG_USER_ERROR(OB_INVALID_ARGUMENT,
                       "max_read_stale_time is smaller than weak_read_version_refresh_interval");
      }

  // 构造新的 sysvar_schema,添加到参数 arg 的 sys_var_list_ 列表中
  if (OB_SUCC(ret)) {
    ......
    } else if (OB_FAIL(sysvar_schema.set_name(set_var.var_name_))) {
      LOG_WARN("set sysvar schema name failed", K(ret));
    } else if (OB_FAIL(sysvar_schema.set_value(val_str))) {
      LOG_WARN("set sysvar schema value failed", K(ret));
    } else {
      sysvar_schema.set_tenant_id(arg.tenant_id_);
      if (OB_FAIL(arg.sys_var_list_.push_back(sysvar_schema))) {
        LOG_WARN("store sys var to array failed", K(ret));
      }
    }
  }
  // 想其他节点发送更新变量的rpc请求
  if (OB_SUCC(ret)) {
    if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx)) ||
        OB_ISNULL(common_rpc_proxy = task_exec_ctx->get_common_rpc())) {
      ret = OB_NOT_INIT;
      LOG_WARN("task exec ctx or common rpc proxy is NULL", K(ret), K(task_exec_ctx), K(common_rpc_proxy));
    } else if (OB_FAIL(common_rpc_proxy->modify_system_variable(arg))) {
      LOG_WARN("rpc proxy alter system variable failed", K(ret));
    } else {}
  }
  return ret;
}

session 变量修改流程

更新session变量值的同时,还会更新对应缓存数据的值,执行如“set xxx = xx”的命令时会触发以下调用流程:

  • ObBasicSysVar::session_update()
  • ObBasicSessionInfo::update_sys_variable_by_name()

通过变量名 var 获取 var_id,然后调用 update_sys_variable() 传入 val。

int ObBasicSessionInfo::update_sys_variable_by_name(const ObString &var, const ObObj &val)
{
  int ret = OB_SUCCESS;
  ObSysVarClassType var_id = SYS_VAR_INVALID;
  if (var.empty()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid variable name", K(var), K(val), K(ret));
  } else if (SYS_VAR_INVALID == (var_id = ObSysVarFactory::find_sys_var_id_by_name(var))) {
    ret = OB_ERR_SYS_VARIABLE_UNKNOWN;
    LOG_WARN("unknown variable", K(var), K(val), K(ret));
  } else if (OB_FAIL(update_sys_variable(var_id, val))) {
    LOG_WARN("failed to update sys variable", K(var), K(val), K(ret));
  } else {}
  return ret;
}
  • ObBasicSessionInfo::update_sys_variable()

先记录变量旧值,然后更新 session 变量缓存值,再更新 sys_var 的值,也就是 sys_vars_[store_idx] 的值。

int ObBasicSessionInfo::update_sys_variable(const ObSysVarClassType sys_var_id, const ObObj &val)
{
    
  } else if (is_track_session_info()) {
    // 记录修改变量的旧值
    if (OB_FAIL(track_sys_var(sys_var_id, sys_var->get_value()))) {
      LOG_WARN("failed to track sys var", K(ret), K(sys_var_id), K(val));
    
  if (OB_SUCC(ret)) {
    if (OB_FAIL(process_session_variable(sys_var_id, val, false /*check_timezone_valid*/,
                                        true /*is_update_sys_var*/))) {
      LOG_WARN("process system variable error",  K(sys_var_id), K(val), K(ret));

        } else {
          sys_var->set_value(val);
  • ObBasicSessionInfo::process_session_variable()

基于传入的变量类型 var,更新对应 session 变量的缓存值,即 inc_data_.xxx 。

OB_INLINE int ObBasicSessionInfo::process_session_variable(ObSysVarClassType var, const ObObj &val,
    const bool check_timezone_valid/*true*/, const bool is_update_sys_var/*false*/)
{
  int ret = OB_SUCCESS;
  switch (var) {
    case SYS_VAR_OB_MAX_READ_STALE_TIME: {
      int64_t max_read_stale_time = 0;
      if (OB_FAIL(val.get_int(max_read_stale_time))) {
        LOG_WARN("fail to get int value", K(ret), K(val));
      } else if (max_read_stale_time != ObSysVarFactory::INVALID_MAX_READ_STALE_TIME &&
                 max_read_stale_time < GCONF.weak_read_version_refresh_interval) {
        ret = OB_INVALID_ARGUMENT;
        LOG_USER_ERROR(OB_INVALID_ARGUMENT,
                       "max_read_stale_time is smaller than weak_read_version_refresh_interval");
      } else {
        sys_vars_cache_.set_ob_max_read_stale_time(max_read_stale_time);
      }
      break;
    }

内部更新

内部更新主要是指系统变量在各节点之间的同步机制。当一个节点执行 global 变量更新操作之后,其他节点刷新 schema 的流程:

  • ObServerSchemaUpdater::process_refresh_task(const ObServerSchemaTask &task)
  • ObMultiVersionSchemaService::refresh_and_add_schema()
  • ObMultiVersionSchemaService::refresh_tenant_schema(const uint64_t tenant_id)
  • ObServerSchemaService::refresh_schema(const ObRefreshSchemaStatus &schema_status)
int ObServerSchemaService::refresh_schema(
    const ObRefreshSchemaStatus &schema_status)
{
    
  } else if (is_full_schema) {

    if (OB_FAIL(refresh_full_schema(schema_status))) {
      LOG_WARN("tenant refresh full schema failed", K(ret), K(schema_status));
    }

  } else {
    if (OB_FAIL(refresh_increment_schema(schema_status))) {
      LOG_WARN("tenant refresh increment schema failed", K(ret), K(schema_status));
    }
  • ObServerSchemaService::refresh_increment_schema()

调用 get_increment_schema_operations() 获取增量 schema 操作 schema_operations,然后调用 replay_log() 回放日志,再调用 update_schema_mgr() 更新 schema。

int ObServerSchemaService::refresh_increment_schema(
    const ObRefreshSchemaStatus &schema_status)
{
      if (OB_SUCC(ret) && !core_schema_change && !sys_schema_change) {
        const int64_t fetch_version = std::max(core_schema_version, schema_version);
        if (OB_FAIL(schema_mgr_for_cache_map_.get_refactored(tenant_id, schema_mgr_for_cache))) {
          LOG_WARN("fail to get schema_mgr_for_cache", KR(ret), K(schema_status));
        } else if (OB_ISNULL(schema_mgr_for_cache)) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("schema mgr for cache is null", KR(ret));
        } else if (OB_FAIL(schema_service_->get_increment_schema_operations(schema_status,
            local_schema_version, fetch_version, sql_client, schema_operations))) {
          LOG_WARN("get_increment_schema_operations failed", KR(ret), K(schema_status),
              K(local_schema_version), K(fetch_version));
        } else if (schema_operations.count() > 0) {
          // new cache
          SMART_VAR(AllSchemaKeys, all_keys) {
            if (OB_FAIL(replay_log(*schema_mgr_for_cache, schema_operations, all_keys))) {
              LOG_WARN("replay_log failed", KR(ret), K(schema_status), K(schema_operations));
            } else if (OB_FAIL(update_schema_mgr(sql_client, schema_status,
                       *schema_mgr_for_cache, fetch_version, all_keys))){
              LOG_WARN("update schema mgr failed", KR(ret), K(schema_status));
            }
          }
        } else {}

        if (OB_SUCC(ret)) {
          schema_mgr_for_cache->set_schema_version(fetch_version);
          if (OB_FAIL(publish_schema(tenant_id))) {
            LOG_WARN("publish_schema failed", KR(ret), K(schema_status));
          } else {
            LOG_INFO("change schema version", K(schema_status), K(schema_version), K(core_schema_version));
            break;
          }
        }

小结

相比前面介绍过的配置项,系统变量的内部机制更加复杂,一个系统变量不仅同时具有 global 级别和 session 级别的副本,还可能在 session 上定义了一份缓存值。在新增系统变量时,要根据实际需求去定义变量的标签和缓存,在使用过程中也要注意取到的值是不是符合预期。

目前对配置项和系统变量的源码解析基本完成,前文中也偶尔提到过“合法性检查”这一概念,在下一篇文章中将会介绍它们是如何进行合法性检查的,以及相关的函数和规则是怎样定义的,欢迎大家持续关注。

参考文档

  1. 系统变量总览
  2. 什么是系统变量?如何使用系统变量?
  3. OceanBase源码

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论