openGauss数据库源码解析(三)| 公共组件源码解析(4)

2023年 11月 16日 71.5k 0

3.4 线程池技术
openGauss在多线程架构的基础上,实现了线程池。线程池机制实现了会话和处理线程分离,在大并发连接的情况下仍然能够保证系统有很好的SLA响应。另外不同的线程组可绑到不同的NUMA(non-uniform memory access,非一致性内存访问)核上,天然匹配NUMA化的CPU架构,从而提升openGauss的整体性能。
3.4.1 线程池原理
openGauss线程池机制原理如图3-2所示,图中的主要对象如表3-3所示。

图3-2 线程池机制原理

表3-3 线程池对象

这些对象相互配合实现了线程池机制,它们的主要交互过程如下。
(1) 客户端向数据库发起连接请求,Postmaster线程接收到连接请求并被唤醒。Postmaster线程创建该连接对应的socket(套接字,用于描述IP地址和端口,是一个通信链的句柄),调用ThreadPoolControler函数创建会话(session)数据结构。ThreadPoolControler函数遍历当前所有的Thread Group(线程组),找到当前活跃会话数量最少的Thread Group,并把最新的会话分发给该Thread Group,加入该Thread Group的epoll(epoll是Linux内核为处理大批量句柄而作了改进的poll(轮询),能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率)列表之中。
(2) Thread Group的listener线程负责监听epoll列表中所有的客户连接。
(3) 客户端发起任务请求,listener线程被唤醒。listener线程检查当前的Thread Group是否有空闲worker线程;如果有,则把当前会话分配给该worker线程,并唤醒该worker线程;如果没有,则把该会话放在等待队列之中。
(4) worker线程被唤醒后,读取客户端连接上的请求,执行相应请求,并返回请求结果。在一次事务结束(提交、回滚)或者事务超时退出的时候,worker线程的一次任务完成。worker线程将会话返回给listener线程,listener线程继续等待该会话的下一次请求。worker线程返还会话后,检查会话等待队列;如果存在等待响应请求的会话,则直接从该队列中取出新的会话并继续工作;如果没有等待响应的会话,则将自身标记为free(空闲)状态,等待listener线程唤醒。
(5) 客户端断开连接时,worker线程被唤醒,关闭连接,同时清理会话相关结构,释放内存和fd(文件句柄)等资源。
(6) 如果worker线程FATAL级别错误退出,退出时worker线程会从worker队列中注销掉。此时listener线程会重新启动一个新的worker线程,直到达到指定数量的worker线程。
3.4.2 线程池实现
线程池功能由GUC参数enable_thread_pool控制,该变量设置为true时才能使用线程池功能。代码主要在“openGauss-server/src/gausskernel/process/threadpool”目录中,下面介绍主要代码实现流程。
Postmaster线程在ServerLoop中判断如果启用了线程池功能,则会调用“ThreadPoolControler::Init”函数进行线程池的初始化。在线程池初始化时,会判断NUMA节点的个数进行NUMA结构处理。相关代码如下:

bool enableNumaDistribute = (g_instance.shmem_cxt.numaNodeNum > 1);
g_threadPoolControler->Init(enableNumaDistribute);
}

“ThreadPoolControler::Init”函数的主要作用是创建m_sessCtrl成员和m_groups成员对象,根据绑核策略分配线程个数,调用“ThreadPoolGroup::init”函数进行线程组的初始化,调用“ThreadPoolGroup::WaitReady”函数等待各个线程组初始化结束。创建m_scheduler成员对象,并且调用“ThreadPoolScheduler::StartUp”函数启动线程池调度线程。在“ThreadPoolGroup::init”函数中,创建m_listener对象,启动listener线程。为ThreadWorkerSentry函数分配内存,初始化每个worker的互斥量和条件变量。调用“ThreadPoolGroup::AddWorker”函数创建worker对象,启动worker线程。
Postmaster线程在ServerLoop中如果监听到有客户端链接请求,判断启用了线程池功能,则会调用“ThreadPoolControler::DispatchSession”函数进行会话分发。相关代码如下:

result = g_threadPoolControler->DispatchSession(port);
/* ThreadPoolControler::DispatchSession的代码实现如下,找到一个会话数最少的线程组,创建会话,把会话添加到线程组的监听线程中 */
int ThreadPoolControler::DispatchSession(Port* port)
{
ThreadPoolGroup* grp = NULL;
knl_session_context* sc = NULL;
grp = FindThreadGroupWithLeastSession();
if (grp == NULL) {
Assert(false);
return STATUS_ERROR;
}
sc = m_sessCtrl->CreateSession(port);
if (sc == NULL)
return STATUS_ERROR;
grp->GetListener()->AddNewSession(sc);
return STATUS_OK;
}

listener线程的主函数为“TpoolListenerMain(ThreadPoolListener* listener)”。在该函数中设置线程的名字和信号处理函数,创建epoll等待事件,通知Postmaster线程已经准备好,调用t_pool_listener_loop函数(其实是调用“ThreadPoolListener::WaitTask”函数进入等待事件状态)。如果有事件到来,调用“ThreadPoolListener::HandleConnEvent”函数找到事件对应的会话。调用“ThreadPoolListener::DispatchSession”函数,如果有空闲的worker线程,通知worker线程进行处理;如果没有空闲的worker线程,则把会话挂到等待队列中。
worker线程的主函数就是正常的SQL处理函数PostgresMain,与非线程模式相比,主要多了3处处理:
(1) worker线程准备就绪的通知。
(2) 等待会话通知。
(3) 连接退出处理。
worker线程的相关代码如下:

u_sess->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET;
t_thrd.threadpool_cxt.worker->NotifyReady();
}
if (IS_THREAD_POOL_WORKER) {
t_thrd.threadpool_cxt.worker->WaitMission();
Assert(u_sess->status != KNL_SESS_FAKE);
}
case 'X':
case EOF:
RemoveTempNamespace();
InitThreadLocalWhenSessionExit();
if (IS_THREAD_POOL_WORKER) {
t_thrd.threadpool_cxt.worker->CleanUpSession(false);
break;
}

“ThreadPoolWorker::WaitMission”函数的主要作用是阻塞所有系统信号,避免系统信号比如SIGHUP等中断当前的处理。清除线程上的会话信息,保证没有上一个会话的内容,等待会话上新的请求,把会话给线程进行处理,允许系统信号中断。
“ThreadPoolWorker::CleanUpSession”函数的主要作用是清除会话,从Listener中去除会话,释放会话资源。
上面介绍了线程池的主要机制,综上所述,线程池主要是解决大并发的用户连接,在一定程度上可以起到流量控制的作用,即使用户的连接数很多,后端也不需要分配太多的线程。线程是OS的一种资源,如果线程太多,OS资源占用很多,并且大量线程的调度和切换会带来昂贵的开销。如果没有线程池,随着连接数的增多,系统的吞吐量会逐渐降低。另外一方面,把线程池划分为线程组,可以很好地匹配NUMA CPU架构的节点,提升多核情况下的访问性能。每个线程组一个监听者,避免了线程池的“惊群效应”。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论