本专题的前几篇文章已经介绍过配置项和系统变量的基本用法,也对配置项的源码进行了解析,一些同学可能还想知道系统变量是如何实现的,以及想跟配置项一样自定义一个新的系统变量。
本文将通过探究“如何新增系统变量”这一问题,结合系统变量的源码实现,讲解系统变量的定义、初始化、内部访问和同步机制。
如何新增系统变量?
一个系统变量可以同时具有 global 级别和 session 级别的数据,在代码中存在多个副本。这与配置项(参数)不同,一个配置项要么是集群级别,要么是租户级别。
系统变量的定义
要增加一个新的变量,首先在 src/share/system_variable/ob_system_variable_init.json 文件中按照下面的格式编写一段代码。
"ob_query_timeout": {
"id": 10005,
"name": "ob_query_timeout",
"default_value": "10000000",
"base_value": "10000000",
"data_type": "int",
"info": "Query timeout in microsecond(us)",
"flags": "GLOBAL | SESSION | NEED_SERIALIZE",
"on_check_and_convert_func": "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large",
"publish_version": "",
"info_cn": "",
"background_cn": "",
"ref_url": ""
},
其中每一项的含义如下:
2参数 | 说明 | 举例 |
对象名(括号外) | 数据结构的名字,与括号内的变量名一致 | ob_query_timeout |
id | 变量的序号,按顺序手动分配 | 0, 1, 95, 10001, 10037 |
name | 变量名,与括号外的对象名一致 | ob_query_timeout |
default_value | 默认值 | 0, 1000, disabled, binary |
base_value | 基准值,初始与默认值相同,用于不同session间的变量同步 | 0, 1000, disabled, binary |
data_type | 数据类型 | int, varchar, enum, bool |
info | 描述信息,描述变量的主要功能 | "Query timeout in microsecond(us)" |
flags | 标签,包括级别、是否序列化、是否可见等 | GLOBAL | SESSION | NEED_SERIALIZE | ORACLE_ONLY | INVISIBLE |
on_check_and_convert_func | 合法性检查和格式转换函数(可选) | ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large |
min_val | 最小值(可选) | 1 |
max_val | 最大值(可选) | 10000 |
enum_names | 枚举类型的取值列表(可选) | "enum_names": ["OFF", "AUTO", "FORCE"], |
publish_version | 发布版本号 | "227", "420"(即2.2.7,4.2.0) |
info_cn | 中文描述 | "查询超时时间(微秒)" |
background_cn | 添加原因/背景 | "避免死锁、一直重试等问题导致的sql阻塞的情况" |
ref_url | 变量申请链接 | "xxx" |
然后执行 ./gen_ob_sys_variables.py 脚本,新增的变量会出现在 ob_system_variable_init.cpp 文件中。
[&] (){
ObSysVars[100].default_value_ = "10000000" ;
ObSysVars[100].info_ = "Query timeout in microsecond(us)" ;
ObSysVars[100].name_ = "ob_query_timeout" ;
ObSysVars[100].data_type_ = ObIntType ;
ObSysVars[100].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ;
ObSysVars[100].on_check_and_convert_func_ = "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large" ;
ObSysVars[100].id_ = SYS_VAR_OB_QUERY_TIMEOUT ;
cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR_OB_QUERY_TIMEOUT)) ;
ObSysVarsIdToArrayIdx[SYS_VAR_OB_QUERY_TIMEOUT] = 100 ;
ObSysVars[100].base_value_ = "10000000" ;
ObSysVars[100].alias_ = "OB_SV_QUERY_TIMEOUT" ;
}();
这些变量的初始值都会存储在以下数组中。其中 ObSysVars 数组保存了从 json 文件生成的最原始的变量信息;ObSysVarDefaultValues 数组存储了系统变量的默认值;ObSysVarBaseValues 数组存储了系统变量的基准值。每新增一个系统变量,这些数组中就会多一个元素。
namespace oceanbase
{
namespace share
{
// 变量的初始信息
static ObSysVarFromJson ObSysVars[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的默认值
static ObObj ObSysVarDefaultValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的基准值
static ObObj ObSysVarBaseValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
系统变量缓存的定义
在某些场景为满足功能或性能的需求,需要额外为系统变量增加一份缓存(按需定义,不是必须的)。系统变量的缓存通过 DEF_SYS_VAR_CACHE_FUNCS 和 DEF_SYS_VAR_CACHE_FUNCS_STR 宏定义。
class ObBasicSessionInfo
{
private:
#define DEF_SYS_VAR_CACHE_FUNCS(SYS_VAR_TYPE, SYS_VAR_NAME) \
void set_##SYS_VAR_NAME(SYS_VAR_TYPE value) \
{ \
inc_data_.SYS_VAR_NAME##_ = (value); \
inc_##SYS_VAR_NAME##_ = true; \
} \
void set_base_##SYS_VAR_NAME(SYS_VAR_TYPE value) \
{ \
base_data_.SYS_VAR_NAME##_ = (value); \
} \
const SYS_VAR_TYPE &get_##SYS_VAR_NAME() const \
{ \
return get_##SYS_VAR_NAME(inc_##SYS_VAR_NAME##_); \
} \
const SYS_VAR_TYPE &get_##SYS_VAR_NAME(bool is_inc) const \
{ \
return is_inc ? inc_data_.SYS_VAR_NAME##_ : base_data_.SYS_VAR_NAME##_; \
}
class SysVarsCache
{
public:
DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_max_read_stale_time);
DEF_SYS_VAR_CACHE_FUNCS(bool, tx_read_only);
DEF_SYS_VAR_CACHE_FUNCS_STR(nls_date_format);
}
private:
static SysVarsCacheData base_data_;
SysVarsCacheData inc_data_;
union {
uint64_t inc_flags_;
struct {
bool inc_auto_increment_increment_:1;
bool inc_sql_throttle_current_priority_:1;
......
其中 base_data_ 表示基准数据,它是全局唯一的,与“global变量”对应;inc_data_ 表示最新数据,每个session各有一份,与“session变量”对应。
以 ob_max_read_stale_time 变量为例,上面的宏展开后生成以下代码。
void set_ob_max_read_stale_time(int64_t value)
{
inc_data_.ob_max_read_stale_time_ = (value);
inc_ob_max_read_stale_time_ = true;
}
void set_base_ob_max_read_stale_time(int64_t value)
{
base_data_.ob_max_read_stale_time_ = (value);
}
const int64_t &get_ob_max_read_stale_time() const
{
return get_ob_max_read_stale_time(inc_ob_max_read_stale_time_);
}
const int64_t &get_ob_max_read_stale_time(bool is_inc) const
{
return is_inc ? inc_data_.ob_max_read_stale_time_ : base_data_.ob_max_read_stale_time_;
}
其中 set_ob_max_read_stale_time() 函数会修改 inc_ob_max_read_stale_time_ 为 true 并更新 inc_data_;然后调用 get_ob_max_read_stale_time() 函数会返回修改过的数据 inc_data_.ob_max_read_stale_time_,未修改则会返回基准数据 base_data_.ob_max_read_stale_time_。
变量初始化
定义上面的一系列数据结构后,接下来在 OBServer 启动时系统变量会进行初始化。
变量基础数据
ObSysVars 数组保存了最原始的变量信息,在静态对象的构造函数中进行初始化。
namespace oceanbase
{
namespace share
{
// 变量的初始信息
static ObSysVarFromJson ObSysVars[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的默认值
static ObObj ObSysVarDefaultValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
// 变量的基准值
static ObObj ObSysVarBaseValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
static struct VarsInit{
VarsInit(){
// 保存当前系统变量的最大的id
int64_t cur_max_var_id = 0;
// ObSysVarsIdToArrayIdx数组默认初始值为-1,-1表示无效索引
memset(ObSysVarsIdToArrayIdx, -1, sizeof(ObSysVarsIdToArrayIdx));
[&] (){
ObSysVars[0].default_value_ = "1" ;
ObSysVars[0].info_ = "" ;
ObSysVars[0].name_ = "auto_increment_increment" ;
ObSysVars[0].data_type_ = ObUInt64Type ;
ObSysVars[0].min_val_ = "1" ;
ObSysVars[0].max_val_ = "65535" ;
ObSysVars[0].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ;
ObSysVars[0].id_ = SYS_VAR_AUTO_INCREMENT_INCREMENT ;
cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR_AUTO_INCREMENT_INCREMENT)) ;
ObSysVarsIdToArrayIdx[SYS_VAR_AUTO_INCREMENT_INCREMENT] = 0 ;
ObSysVars[0].base_value_ = "1" ;
ObSysVars[0].alias_ = "OB_SV_AUTO_INCREMENT_INCREMENT" ;
}();
ObSysVarDefaultValues 数组保存了系统变量的默认值,ObSysVarBaseValues 数组保存了系统变量的基准值。在 OBServer 启动时,会初始化这两个数组:
- ObServer::init()
- ObPreProcessSysVars::init_sys_var()
- ObSysVariables::init_default_values()
该函数遍历 ObSysVars 数组,将每个变量的 default_value_ 存储到 ObSysVarDefaultValues 数组中,将每个变量的 base_value_ 存储到 ObSysVarBaseValues 数组中。
int ObSysVariables::init_default_values()
{
int ret = OB_SUCCESS;
int64_t sys_var_count = get_amount();
for (int64_t i = 0; OB_SUCC(ret) && i < sys_var_count; ++i) {
const ObString &sys_var_val_str = ObSysVariables::get_value(i);
const ObString &base_sys_var_val_str = ObSysVariables::get_base_str_value(i);
......
ObSysVarDefaultValues[i] = out_obj;
ObSysVarBaseValues[i] = base_out_obj;
global 变量:sysvar_array_
ObSysVariableSchema 是管理系统变量的 schema 对象,其中的 sysvar_array_ 数组存储了 global 变量的值,因此系统变量也具有 schema 多版本的能力。
class ObSysVariableSchema : public ObSchema
{
private:
uint64_t tenant_id_;
int64_t schema_version_;
ObSysVarSchema *sysvar_array_[ObSysVarFactory::ALL_SYS_VARS_COUNT];
bool read_only_;
class ObSysVarSchema : public ObSchema
{
private:
uint64_t tenant_id_;
common::ObString name_;
common::ObObjType data_type_;
common::ObString value_;
common::ObString min_val_;
common::ObString max_val_;
common::ObString info_;
common::ObZone zone_;
int64_t schema_version_;
int64_t flags_;
sysvar_array_ 数组在 OBServer 启动时进行初始化,首先调用的是 init_schema() 函数。
- ObServer::init_schema()
- ObMultiVersionSchemaService::init()
- ObMultiVersionSchemaService::init_sys_tenant_user_schema()
先调用 load_default_system_variable() 将所有变量的默认值都加载到 sysvar_array_ 数组中,再调用 put_schema() 把这个 schema 添加到 schema_cache_ 中。
int ObMultiVersionSchemaService::init_sys_tenant_user_schema()
{
ObSysVariableSchema sys_variable;
if (OB_FAIL(sys_variable.load_default_system_variable(true))) {
LOG_WARN("load sys tenant default system variable failed", K(ret));
} else if (OB_FAIL(schema_cache_.put_schema(SYS_VARIABLE_SCHEMA,
OB_SYS_TENANT_ID,
sys_variable.get_tenant_id(),
sys_variable.get_schema_version(),
sys_variable))) {
- ObSysVariableSchema::load_default_system_variable()
该函数遍历 ObSysVars 数组获取变量的默认值,然后调用 add_sysvar_schema() 函数将变量值添加到 sysvar_array_ 数组中。
int ObSysVariableSchema::load_default_system_variable(bool is_sys_tenant)
{
int ret = OB_SUCCESS;
ObString value;
ObSysVarSchema sysvar;
for (int64_t i = 0; OB_SUCC(ret) && i < ObSysVariables::get_amount(); ++i) {
sysvar.reset();
// 从 ObSysVariables 中获取默认数据
if (is_sys_tenant && ObCharset::case_insensitive_equal(ObSysVariables::get_name(i), OB_SV_LOWER_CASE_TABLE_NAMES)) {
value = ObString::make_string("2");
} else {
value = ObSysVariables::get_value(i);
}
} else if (OB_FAIL(sysvar.set_value(value))) {
LOG_WARN("set sysvar value failed", K(ret), K(value));
} else if (OB_FAIL(add_sysvar_schema(sysvar))) {
LOG_WARN("add sysvar schema failed", K(ret));
}
}
session 变量:sys_vars_
ObBasicSessionInfo 是管理 session 相关信息的对象,其中的 sys_vars_ 数组存储了 session 变量的值,在建立 session 连接时进行初始化。
class ObBasicSessionInfo
{
private:
share::ObBasicSysVar *sys_vars_[share::ObSysVarFactory::ALL_SYS_VARS_COUNT];
外部 session
外部 session 连接是指从客户端发起的连接,一般用于处理用户的 sql 请求。外部 session 的变量缓存值在初始化时,会从 schema 中取值,同时也依赖 schema 的同步机制。
- int ObMPConnect::process()
在建立连接时,先调用 create_session() 函数创建 session,然后调用 verify_identify() 初始化变量。
- ObMPConnect::verify_identify()
- ObMPConnect::load_privilege_info()
通过租户ID从 schema_cache_ 中获取对应的 sys_variable_schema,然后将 sys_variable_schema 作为参数调用 load_all_sys_vars() 函数。
int ObMPConnect::load_privilege_info(ObSQLSessionInfo &session)
{
const ObSysVariableSchema *sys_variable_schema = NULL;
} else if (OB_FAIL(schema_guard.get_sys_variable_schema(conn->tenant_id_, sys_variable_schema))) {
LOG_WARN("get sys variable schema failed", K(ret));
} else if (OB_ISNULL(sys_variable_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sys variable schema is null", K(ret));
} else if (OB_FAIL(session.init_tenant(tenant_name_, conn->tenant_id_))) {
LOG_WARN("failed to init_tenant", K(ret));
} else if (OB_FAIL(session.load_all_sys_vars(*sys_variable_schema, false))) {
LOG_WARN("load system variables failed", K(ret));
- ObBasicSessionInfo::load_all_sys_vars()
该函数每次循环,先从 ObSysVars 数组中获取 sys_var_id,然后从传入的 sys_var_schema 中获取 sys_var 对象,然后把变量的 name 和 value 作为参数,调用 load_sys_variable() 函数。
int ObBasicSessionInfo::load_all_sys_vars(const ObSysVariableSchema &sys_var_schema, bool sys_var_created)
{
int ret = OB_SUCCESS;
OZ (clean_all_sys_vars());
if (!sys_var_created) {
OZ (sys_var_fac_.create_all_sys_vars());
}
OX (influence_plan_var_indexs_.reset());
ObArenaAllocator calc_buf(ObModIds::OB_SQL_SESSION);
for (int64_t i = 0; OB_SUCC(ret) && i < get_sys_var_count(); i++) {
ObSysVarClassType sys_var_id = ObSysVariables::get_sys_var_id(i);
const ObSysVarSchema *sys_var = NULL;
OZ (sys_var_schema.get_sysvar_schema(sys_var_id, sys_var), sys_var_id, i);
OV (OB_NOT_NULL(sys_var));
OZ (load_sys_variable(calc_buf, sys_var->get_name(), sys_var->get_data_type(),
sys_var->get_value(), sys_var->get_min_val(),
sys_var->get_max_val(), sys_var->get_flags(), true));
- ObBasicSessionInfo::load_sys_variable()
先调用 find_sys_var_id_by_name() 函数通过 name 获取 var_id,然后调用 create_sys_var() 函数获取(创建)变量指针 sys_var,再调用 sys_var->init() 给变量赋值,最后调用 process_session_variable() 函数给变量缓存赋值。
int ObBasicSessionInfo::load_sys_variable(ObIAllocator &calc_buf,
const ObString &name,
const ObObj &type,
const ObObj &value,
const ObObj &min_val,
const ObObj &max_val,
const int64_t flags,
bool is_from_sys_table)
{
if (SYS_VAR_INVALID == (var_id = ObSysVarFactory::find_sys_var_id_by_name(name, is_from_sys_table))) {
......
} else if (OB_FAIL(create_sys_var(var_id, store_idx, sys_var))) {
LOG_WARN("fail to create sys var", K(name), K(value), K(ret));
......
} else if (OB_FAIL(sys_var->init(real_val, min_ptr, max_ptr, val_type.get_type(), flags))) {
LOG_WARN("fail to init sys var", K(ret), K(sys_var->get_type()),
K(real_val), K(name), K(value));
} else if (OB_FAIL(process_session_variable(var_id, real_val,
false /*check_timezone_valid*/,
false /*is_update_sys_var*/))) {
- ObBasicSessionInfo::create_sys_var()
初次调用 create_sys_var() 函数时,sys_vars_[store_idx] 为空,然后调用 sys_var_fac_.create_sys_var() 函数生成 sys_var,再把 sys_var 赋给 sys_vars_[store_idx]。session 刚初始化时,sys_var 指向的是变量的默认值。
再次调用 create_sys_var() 函数时,sys_vars_[store_idx] 不为空,将其值赋给 sys_var,相当于获取了当前session 的变量。
int ObBasicSessionInfo::create_sys_var(ObSysVarClassType sys_var_id,
int64_t store_idx, ObBasicSysVar *&sys_var)
{
int ret = OB_SUCCESS;
OV (0 <= store_idx && store_idx < ObSysVarFactory::ALL_SYS_VARS_COUNT,
OB_ERR_UNEXPECTED, sys_var_id, store_idx);
if (OB_NOT_NULL(sys_vars_[store_idx])) {
OV (sys_vars_[store_idx]->get_type() == sys_var_id,
OB_ERR_UNEXPECTED, sys_var_id, store_idx, sys_vars_[store_idx]->get_type());
OX (sys_var = sys_vars_[store_idx]);
} else {
OZ (sys_var_fac_.create_sys_var(sys_var_id, sys_var), sys_var_id);
OV (OB_NOT_NULL(sys_var), OB_ERR_UNEXPECTED, sys_var_id, store_idx);
OX (sys_vars_[store_idx] = sys_var);
}
return ret;
}
内部 session
内部 session 连接是指 OBServer 内部或 OBProxy 向 OBServer 发起的内部请求建立的连接,不是用户主动建立的连接。内部 session 的变量总是获取全局默认值作为初值,而不是从最新的 schema 中取值。
- ObCommonSqlProxy::acquire()
- ObInnerSQLConnectionPool::acquire()
- ObInnerSQLConnection::init()
- ObInnerSQLConnection::init_session()
- ObInnerSQLConnection::init_session_info()
- ObBasicSessionInfo::load_default_sys_variable()
int ObBasicSessionInfo::load_default_sys_variable(const bool print_info_log, const bool is_sys_tenant, bool is_deserialized)
{
int ret = OB_SUCCESS;
if (OB_FAIL(sys_var_fac_.create_all_sys_vars())) {
LOG_WARN("fail create all sys variables", K(ret));
} else if (OB_FAIL(init_system_variables(print_info_log, is_sys_tenant, is_deserialized))) {
LOG_WARN("Init system variables failed !", K(ret));
}
return ret;
}
- ObBasicSessionInfo::init_system_variables()
调用 ObSysVariables::get_value(i) 获取 value,也就是从全局的 ObSysVars 数组中获取默认值,然后调用 load_sys_variable() 函数将 value 的值加载到 session 中。
int ObBasicSessionInfo::init_system_variables(const bool print_info_log, const bool is_sys_tenant,
bool is_deserialized)
{
for (int64_t i = 0; OB_SUCC(ret) && i < var_amount; ++i) {
name.assign_ptr(const_cast<char*>(ObSysVariables::get_name(i).ptr()),
static_cast<ObString::obstr_size_t>(strlen(ObSysVariables::get_name(i).ptr())));
bool is_exist = false;
if (OB_FAIL(sys_variable_exists(name, is_exist))) {
LOG_WARN("failed to check if sys variable exists", K(name), K(ret));
} else if (!is_exist) {
// Note: 如果已经初始化过 base value,则下面的流程不会执行
var_type = ObSysVariables::get_type(i);
var_flag = ObSysVariables::get_flags(i);
value.set_varchar(is_deserialized ? ObSysVariables::get_base_str_value(i) :ObSysVariables::get_value(i));
value.set_collation_type(ObCharset::get_system_collation());
min_val.set_varchar(ObSysVariables::get_min(i));
min_val.set_collation_type(ObCharset::get_system_collation());
max_val.set_varchar(ObSysVariables::get_max(i));
max_val.set_collation_type(ObCharset::get_system_collation());
type.set_type(var_type);
if(is_sys_tenant) {
if (OB_FAIL(process_variable_for_tenant(name, value))) {
LOG_WARN("process system variable for tenant error", K(name), K(value), K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(load_sys_variable(calc_buf, name, type, value, min_val, max_val, var_flag, false))) {
- ObBasicSessionInfo::load_sys_variable()
初次调用 create_sys_var() 函数时,创建 session 变量 sys_var,然后用传入的默认值 value 对其进行初始化。
int ObBasicSessionInfo::load_sys_variable(ObIAllocator &calc_buf,
const ObString &name,
const ObObj &type,
const ObObj &value,
const ObObj &min_val,
const ObObj &max_val,
const int64_t flags,
bool is_from_sys_table)
{
} else if (OB_FAIL(create_sys_var(var_id, store_idx, sys_var))) {
LOG_WARN("fail to create sys var", K(name), K(value), K(ret));
} else if (OB_FAIL(ObBasicSessionInfo::change_value_for_special_sys_var(
var_id, val_ptr, real_val))) {
LOG_WARN("fail to change value for special sys var", K(ret), K(var_id), K(val_ptr));
} else if (OB_FAIL(sys_var->init(real_val, min_ptr, max_ptr, val_type.get_type(), flags))) {
LOG_WARN("fail to init sys var", K(ret), K(sys_var->get_type()),
K(real_val), K(name), K(value));
} else if (OB_FAIL(process_session_variable(var_id, real_val,
false /*check_timezone_valid*/,
false /*is_update_sys_var*/))) {
LOG_WARN("process system variable error", K(name), K(type), K(real_val), K(value), K(ret));
变量缓存基准值:base_data_
base_data_ 表示系统变量缓存的基准数据,它是 OBServer 全局唯一的。
- ObServer::init()
- ObBasicSessionInfo::init_sys_vars_cache_base_values()
遍历 ObSysVarDefaultValues 数组,将变量的默认值写到 ObBasicSessionInfo::base_data_ 中,这是一个 static SysVarsCacheData[] 类型的数组。
int ObBasicSessionInfo::init_sys_vars_cache_base_values()
{
int ret = OB_SUCCESS;
int64_t store_idx = OB_INVALID_INDEX_INT64;
ObBasicSessionInfo::SysVarsCache sys_vars_cache;
int64_t var_amount = ObSysVariables::get_amount();
for (int64_t i = 0; OB_SUCC(ret) && i < var_amount; ++i) {
store_idx = ObSysVarsToIdxMap::get_store_idx((int64_t)ObSysVariables::get_sys_var_id(i));
OX (fill_sys_vars_cache_base_value(
ObSysVariables::get_sys_var_id(i),
sys_vars_cache,
ObSysVariables::get_default_value(store_idx) ));
}
return ret;
}
变量缓存增量值:inc_data_
inc_data_ 表示 session 变量的缓存值,每个 session 各有一份缓存数据。
外部session 的 inc_data_ 变量缓存值初始化为 schema 中的最新值,内部 session 的 inc_data_ 初始化为变量的默认值,也就是说变量缓存与 session 变量的初值是一致的,它们初始化的调用路径也基本相同。
变量查询
外部查询
查询 global 变量
global 变量的值是从 schema 中获取的,在执行如“show global variables like xxx”的命令时就会触发以下流程:
- ObBasicSessionInfo::get_global_sys_variable()
先调用 get_sys_variable_schema() 函数获取 sys_variable_schema 对象,然后从该对象中获取 sysvar_schema。
int ObBasicSessionInfo::get_global_sys_variable(const uint64_t actual_tenant_id, // 为了处理租户已经切掉的情况
ObIAllocator &calc_buf,
const ObDataTypeCastParams &dtc_params,
const ObString &var_name,
ObObj &val)
{
} else if (OB_FAIL(schema_guard.get_sys_variable_schema(actual_tenant_id, sys_variable_schema))) {
LOG_WARN("get sys variable schema failed", K(ret));
} else if (OB_ISNULL(sys_variable_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sys variable schema is null", K(ret));
} else if (OB_FAIL(sys_variable_schema->get_sysvar_schema(var_name, sysvar_schema))) {
- ObSysVariableSchema::get_sysvar_schema(const ObString &name, const ObSysVarSchema *&sysvar_schema)
- ObSysVariableSchema::get_sysvar_schema(ObSysVarClassType var_id, const ObSysVarSchema *&sysvar_schema)
- ObSysVariableSchema::get_sysvar_schema(int64_t idx)
该函数根据传入的 idx 获取对应的变量,最终返回 sysvar_array_[idx]。
const ObSysVarSchema *ObSysVariableSchema::get_sysvar_schema(int64_t idx) const
{
const ObSysVarSchema *ret = NULL;
if (idx >= 0 && idx < get_sysvar_count()) {
ret = sysvar_array_[idx];
}
return ret;
}
查询 session 变量
session 变量的值是从 session 本地数组 sys_vars_ 中获取的,执行如“show variables like xxx”的命令时会触发以下调用流程:
- ObResultSet::inner_get_next_row()
- ......
- ObBasicSessionInfo::get_sys_variable_by_name()
- ObBasicSessionInfo::inner_get_sys_var(oceanbase::common::ObString const&, oceanbase::share::ObBasicSysVar*&)
- ObBasicSessionInfo::inner_get_sys_var(oceanbase::common::ObString const&, long&, oceanbase::share::ObBasicSysVar*&)
该函数先调用 find_sys_var_id_by_name() 函数通过 sys_var_name 获取 sys_var_id,然后调用 calc_sys_var_store_idx() 函数利用 sys_var_id 计算出 store_idx,也就是变量在数组中的下标,最后返回 sys_vars_[store_idx] 即可。
int ObBasicSessionInfo::inner_get_sys_var(const ObString &sys_var_name,
int64_t &store_idx,
ObBasicSysVar *&sys_var) const
{
int ret = OB_SUCCESS;
ObSysVarClassType sys_var_id = SYS_VAR_INVALID;
if (OB_UNLIKELY(SYS_VAR_INVALID == (
sys_var_id = ObSysVarFactory::find_sys_var_id_by_name(sys_var_name)))) {
ret = OB_ERR_SYS_VARIABLE_UNKNOWN;
LOG_WARN("fail to find sys var id by name", K(ret), K(sys_var_name), K(lbt()));
} else if (OB_FAIL(ObSysVarFactory::calc_sys_var_store_idx(sys_var_id, store_idx))) {
LOG_WARN("fail to calc sys var store idx", K(ret), K(sys_var_id), K(sys_var_name), K(lbt()));
} else if (OB_UNLIKELY(store_idx < 0) ||
OB_UNLIKELY(store_idx >= ObSysVarFactory::ALL_SYS_VARS_COUNT)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("got store_idx is invalid", K(ret), K(store_idx));
} else if (OB_ISNULL(sys_vars_[store_idx])) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("sys var is NULL", K(ret), K(store_idx), K(sys_var_name));
} else {
sys_var = sys_vars_[store_idx];
}
return ret;
}
内部获取
获取 global 变量
内部访问 global 变量同样是通过 schema 获取,先从全局的 GCTX.schema_service_中获取 schema_guard,然后通过变量名获取 var_schema,再逐步解析得到最终的基础数据类型。
例如获取 weak_read_version_refresh_interval:
int ObRootService::check_weak_read_version_refresh_interval(int64_t refresh_interval, bool &valid)
{
......
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("get schema guard failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_tenant_system_variable(tenant_id,
OB_SV_MAX_READ_STALE_TIME, var_schema))) {
LOG_WARN("get tenant system variable failed", KR(ret), K(tenant_id));
} else if (OB_ISNULL(var_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("var schema is null", KR(ret), K(tenant_id));
} else if (OB_FAIL(var_schema->get_value(NULL, NULL, obj))) {
LOG_WARN("get value failed", KR(ret), K(tenant_id), K(obj));
} else if (OB_FAIL(obj.get_int(session_max_stale_time))) {
LOG_WARN("get int failed", KR(ret), K(tenant_id), K(obj));
} else if (session_max_stale_time != share::ObSysVarFactory::INVALID_MAX_READ_STALE_TIME
&& refresh_interval > session_max_stale_time) {
通过这种方式获取变量,通常会定义一个变量别名,方便代码中进行访问。
namespace oceanbase
{
namespace share
{
static const char* const OB_SV_ENABLE_RICH_ERROR_MSG = "ob_enable_rich_error_msg";
static const char* const OB_SV_LOG_ROW_VALUE_OPTIONS = "log_row_value_options";
static const char* const OB_SV_MAX_READ_STALE_TIME = "ob_max_read_stale_time";
......
获取 session 变量最新值
内部查询调用 get_sys_var() 函数通过下标 idx 获取 session 变量的值。
ObBasicSysVar *ObBasicSessionInfo::get_sys_var(const int64_t idx)
{
ObBasicSysVar *var = NULL;
if (idx >= 0 && idx < ObSysVarFactory::ALL_SYS_VARS_COUNT) {
var = sys_vars_[idx];
}
return var;
}
获取 session 变量缓存值
获取变量缓存值,就是从 ObBasicSessionInfo::sys_vars_cache_数组中取值,通常是封装了一些函数,然后再调用 sys_vars_cache_ 的 get_xxx() 接口,这些接口就是用 DEF_SYS_VAR_CACHE_FUNCS 宏定义的。
举几个例子:
class ObBasicSessionInfo
{
int64_t get_trx_lock_timeout() const
{
return sys_vars_cache_.get_ob_trx_lock_timeout();
}
int64_t get_ob_max_read_stale_time() {
return sys_vars_cache_.get_ob_max_read_stale_time();
}
int get_sql_throttle_current_priority(int64_t &sql_throttle_current_priority)
{
sql_throttle_current_priority = sys_vars_cache_.get_sql_throttle_current_priority();
return common::OB_SUCCESS;
}
只要能够访问 session,就可以访问 session 中的变量,比如获取 ob_max_read_stale_time:
session->get_ob_max_read_stale_time();
变量更新
外部修改
global 变量修改流程
更新 global 变量就是更新其对应的 schema,执行如“set global xxx = xx”的命令时触发以下调用流程:
- ObMPQuery::do_process()
- ObVariableSetExecutor::execute(ObExecContext &ctx, ObVariableSetStmt &stmt)
- ObVariableSetExecutor::update_global_variables()
该函数先对更新数据做合法性检查,然后构造一个新的 sysvar_schema 加入到 rpc 参数中,最后发送更新系统变量的请求到其他节点。
int ObVariableSetExecutor::update_global_variables(ObExecContext &ctx,
ObDDLStmt &stmt,
const ObSetVar &set_var,
const ObObj &val)
{
// 合法性检查
} else if (set_var.var_name_ == OB_SV_MAX_READ_STALE_TIME) {
int64_t max_read_stale_time = 0;
if (OB_FAIL(val.get_int(max_read_stale_time))) {
LOG_WARN("fail to get int value", K(ret), K(val));
} else if (max_read_stale_time != ObSysVarFactory::INVALID_MAX_READ_STALE_TIME &&
max_read_stale_time < GCONF.weak_read_version_refresh_interval) {
ret = OB_INVALID_ARGUMENT;
LOG_USER_ERROR(OB_INVALID_ARGUMENT,
"max_read_stale_time is smaller than weak_read_version_refresh_interval");
}
// 构造新的 sysvar_schema,添加到参数 arg 的 sys_var_list_ 列表中
if (OB_SUCC(ret)) {
......
} else if (OB_FAIL(sysvar_schema.set_name(set_var.var_name_))) {
LOG_WARN("set sysvar schema name failed", K(ret));
} else if (OB_FAIL(sysvar_schema.set_value(val_str))) {
LOG_WARN("set sysvar schema value failed", K(ret));
} else {
sysvar_schema.set_tenant_id(arg.tenant_id_);
if (OB_FAIL(arg.sys_var_list_.push_back(sysvar_schema))) {
LOG_WARN("store sys var to array failed", K(ret));
}
}
}
// 想其他节点发送更新变量的rpc请求
if (OB_SUCC(ret)) {
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx)) ||
OB_ISNULL(common_rpc_proxy = task_exec_ctx->get_common_rpc())) {
ret = OB_NOT_INIT;
LOG_WARN("task exec ctx or common rpc proxy is NULL", K(ret), K(task_exec_ctx), K(common_rpc_proxy));
} else if (OB_FAIL(common_rpc_proxy->modify_system_variable(arg))) {
LOG_WARN("rpc proxy alter system variable failed", K(ret));
} else {}
}
return ret;
}
session 变量修改流程
更新session变量值的同时,还会更新对应缓存数据的值,执行如“set xxx = xx”的命令时会触发以下调用流程:
- ObBasicSysVar::session_update()
- ObBasicSessionInfo::update_sys_variable_by_name()
通过变量名 var 获取 var_id,然后调用 update_sys_variable() 传入 val。
int ObBasicSessionInfo::update_sys_variable_by_name(const ObString &var, const ObObj &val)
{
int ret = OB_SUCCESS;
ObSysVarClassType var_id = SYS_VAR_INVALID;
if (var.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid variable name", K(var), K(val), K(ret));
} else if (SYS_VAR_INVALID == (var_id = ObSysVarFactory::find_sys_var_id_by_name(var))) {
ret = OB_ERR_SYS_VARIABLE_UNKNOWN;
LOG_WARN("unknown variable", K(var), K(val), K(ret));
} else if (OB_FAIL(update_sys_variable(var_id, val))) {
LOG_WARN("failed to update sys variable", K(var), K(val), K(ret));
} else {}
return ret;
}
- ObBasicSessionInfo::update_sys_variable()
先记录变量旧值,然后更新 session 变量缓存值,再更新 sys_var 的值,也就是 sys_vars_[store_idx] 的值。
int ObBasicSessionInfo::update_sys_variable(const ObSysVarClassType sys_var_id, const ObObj &val)
{
} else if (is_track_session_info()) {
// 记录修改变量的旧值
if (OB_FAIL(track_sys_var(sys_var_id, sys_var->get_value()))) {
LOG_WARN("failed to track sys var", K(ret), K(sys_var_id), K(val));
if (OB_SUCC(ret)) {
if (OB_FAIL(process_session_variable(sys_var_id, val, false /*check_timezone_valid*/,
true /*is_update_sys_var*/))) {
LOG_WARN("process system variable error", K(sys_var_id), K(val), K(ret));
} else {
sys_var->set_value(val);
- ObBasicSessionInfo::process_session_variable()
基于传入的变量类型 var,更新对应 session 变量的缓存值,即 inc_data_.xxx 。
OB_INLINE int ObBasicSessionInfo::process_session_variable(ObSysVarClassType var, const ObObj &val,
const bool check_timezone_valid/*true*/, const bool is_update_sys_var/*false*/)
{
int ret = OB_SUCCESS;
switch (var) {
case SYS_VAR_OB_MAX_READ_STALE_TIME: {
int64_t max_read_stale_time = 0;
if (OB_FAIL(val.get_int(max_read_stale_time))) {
LOG_WARN("fail to get int value", K(ret), K(val));
} else if (max_read_stale_time != ObSysVarFactory::INVALID_MAX_READ_STALE_TIME &&
max_read_stale_time < GCONF.weak_read_version_refresh_interval) {
ret = OB_INVALID_ARGUMENT;
LOG_USER_ERROR(OB_INVALID_ARGUMENT,
"max_read_stale_time is smaller than weak_read_version_refresh_interval");
} else {
sys_vars_cache_.set_ob_max_read_stale_time(max_read_stale_time);
}
break;
}
内部更新
内部更新主要是指系统变量在各节点之间的同步机制。当一个节点执行 global 变量更新操作之后,其他节点刷新 schema 的流程:
- ObServerSchemaUpdater::process_refresh_task(const ObServerSchemaTask &task)
- ObMultiVersionSchemaService::refresh_and_add_schema()
- ObMultiVersionSchemaService::refresh_tenant_schema(const uint64_t tenant_id)
- ObServerSchemaService::refresh_schema(const ObRefreshSchemaStatus &schema_status)
int ObServerSchemaService::refresh_schema(
const ObRefreshSchemaStatus &schema_status)
{
} else if (is_full_schema) {
if (OB_FAIL(refresh_full_schema(schema_status))) {
LOG_WARN("tenant refresh full schema failed", K(ret), K(schema_status));
}
} else {
if (OB_FAIL(refresh_increment_schema(schema_status))) {
LOG_WARN("tenant refresh increment schema failed", K(ret), K(schema_status));
}
- ObServerSchemaService::refresh_increment_schema()
调用 get_increment_schema_operations() 获取增量 schema 操作 schema_operations,然后调用 replay_log() 回放日志,再调用 update_schema_mgr() 更新 schema。
int ObServerSchemaService::refresh_increment_schema(
const ObRefreshSchemaStatus &schema_status)
{
if (OB_SUCC(ret) && !core_schema_change && !sys_schema_change) {
const int64_t fetch_version = std::max(core_schema_version, schema_version);
if (OB_FAIL(schema_mgr_for_cache_map_.get_refactored(tenant_id, schema_mgr_for_cache))) {
LOG_WARN("fail to get schema_mgr_for_cache", KR(ret), K(schema_status));
} else if (OB_ISNULL(schema_mgr_for_cache)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema mgr for cache is null", KR(ret));
} else if (OB_FAIL(schema_service_->get_increment_schema_operations(schema_status,
local_schema_version, fetch_version, sql_client, schema_operations))) {
LOG_WARN("get_increment_schema_operations failed", KR(ret), K(schema_status),
K(local_schema_version), K(fetch_version));
} else if (schema_operations.count() > 0) {
// new cache
SMART_VAR(AllSchemaKeys, all_keys) {
if (OB_FAIL(replay_log(*schema_mgr_for_cache, schema_operations, all_keys))) {
LOG_WARN("replay_log failed", KR(ret), K(schema_status), K(schema_operations));
} else if (OB_FAIL(update_schema_mgr(sql_client, schema_status,
*schema_mgr_for_cache, fetch_version, all_keys))){
LOG_WARN("update schema mgr failed", KR(ret), K(schema_status));
}
}
} else {}
if (OB_SUCC(ret)) {
schema_mgr_for_cache->set_schema_version(fetch_version);
if (OB_FAIL(publish_schema(tenant_id))) {
LOG_WARN("publish_schema failed", KR(ret), K(schema_status));
} else {
LOG_INFO("change schema version", K(schema_status), K(schema_version), K(core_schema_version));
break;
}
}
小结
相比前面介绍过的配置项,系统变量的内部机制更加复杂,一个系统变量不仅同时具有 global 级别和 session 级别的副本,还可能在 session 上定义了一份缓存值。在新增系统变量时,要根据实际需求去定义变量的标签和缓存,在使用过程中也要注意取到的值是不是符合预期。
目前对配置项和系统变量的源码解析基本完成,前文中也偶尔提到过“合法性检查”这一概念,在下一篇文章中将会介绍它们是如何进行合法性检查的,以及相关的函数和规则是怎样定义的,欢迎大家持续关注。
参考文档
- 系统变量总览
- 什么是系统变量?如何使用系统变量?
- OceanBase源码