事务机制源码解析| 事务并发控制(4)

2023年 12月 15日 44.6k 0

5.2.4 进程内多线程管理机制

简述进程内多线程管理机制相关数据结构及多版本快照计算机制。
1. 事务信息管理
数据库启动时候维护了一段共享内存,每个线程初始化的时候会从这个共享内存中获取一个槽位并将其线程信息记录到槽位中。获取快照时,需要在共享内存数组中更新槽位信息,事务结束时,需要从槽位中将其事务信息清除。计算快照时,通过遍历该全局数组,获取当前所有并发线程的事务信息,并计算出快照信息(xmin、xmax、snapshotcsn等)。事务信息管理的关键数据结构代码如下:

GTM_TransactionHandle handle; /* 单机模式无用参数 */
TransactionId xid; /* 该线程持有的xid号,如果没有则为0 */
TransactionId prepare_xid; /* 准备阶段的xid号*/

TransactionId xmin; /* 当前事务开启时最小的活跃xid,vaccum操作不会删除那些xid大于等于xmin的元组。 */
CommitSeqNo csn_min; /* 当前事务开启时最小的活跃CSN号*/
TransactionId next_xid; /* 单机模式无用参数*/
int nxids; /* 子事物个数*/
uint8 vacuumFlags; /* vacuum操作相关的flag */

bool needToSyncXid; /* 单机模式无用参数*/
bool delayChkpt; /* 如果该线程需要checkpoint线程延迟等待,此值为true
#ifdef __aarch64__ */
char padding[PG_CACHE_LINE_SIZE - PGXACT_PAD_OFFSET]; /* 为了性能考虑的结构体对齐*/
#endif
} PGXACT;

struct PGPROC {
SHM_QUEUE links; /* 链表中的指针 */

PGSemaphoreData sem; /* 休眠等待的信号量 */
int waitStatus; /* 等待状态 */

Latch procLatch; /* 线程的通用闩锁 */

LocalTransactionId lxid; /* 当前线程本地顶层事务ID */
ThreadId pid; /* 线程的PID */

ThreadId sessMemorySessionid;
uint64 sessionid; /* 线程池模式下当前的会话ID */
int logictid; /* 逻辑线程ID */
TransactionId gtt_session_frozenxid; /* 会话级全局临时表的冻结XID */

int pgprocno;
int nodeno;

/* 线程启动时下面这些数据结构为0 */
BackendId backendId; /* 线程的后台ID */
Oid databaseId; /* 当前访问数据库的OID */
Oid roleId; /* 当前用户的OID */

/* 版本号,用于升级过程中新老版本的判断 */
uint32 workingVersionNum;

/*热备模式下,标记当前事务是否收到冲突信号。设置该值时需要持有ProcArray锁。 */
bool recoveryConflictPending;

/* 线程等待的轻量级锁信息. */
bool lwWaiting; /* 当等待轻量级锁时,为真 */
uint8 lwWaitMode; /* 预获取锁的模式 */
bool lwIsVictim; /* 强制放弃轻量级锁 */
dlist_node lwWaitLink; /* 等待在相同轻量级锁对象的下一个等待者 */

/* 线程等待的常规锁信息 */
LOCK* waitLock; /* 等待的常规锁对象 */
PROCLOCK* waitProcLock; /* 等待常规锁对象的持有者 */
LOCKMODE waitLockMode; /* 预获取常规锁对象的模式 */
LOCKMASK heldLocks; /* 本线程获取锁对象模式的位掩码 */

/* 等待主备机回放日志同步的信息 */
XLogRecPtr waitLSN; /* 等待的lsn*/
int syncRepState; /* 等待主备同步的状态 */
bool syncRepInCompleteQueue; /* 是否等待在完成队列中 */
SHM_QUEUE syncRepLinks; /* 指向同步队列的指针 */

DataQueuePtr waitDataSyncPoint; /* 数据页复制的数据同步点 */
int dataSyncRepState; /* 数据页复制的同步状态 */
SHM_QUEUE dataSyncRepLinks; /* 指向数据页同步队列的指针*/

MemoryContext topmcxt; /* 本线程的顶层内存上下文 */
char myProgName[64];
pg_time_t myStartTime;
syscalllock deleMemContextMutex;

SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS];

/* 以下结构为了实现XID批量提交 */
/* 是否为XID批量提交中的成员 */
bool procArrayGroupMember;
/* XID批量提交中的下一个成员 */
pg_atomic_uint32 procArrayGroupNext;
/* 父事务XID和子事物XID中的最大者 */
TransactionId procArrayGroupMemberXid;

/* 提交序列号 */
CommitSeqNo commitCSN;

/* 以下结构为了实现CLOG批量提交 */
bool clogGroupMember; /* 是否为CLOG批量提交中的成员*/
pg_atomic_uint32 clogGroupNext; /* CLOG批量提交中的下一个成员*/
TransactionId clogGroupMemberXid; /* CLOG批量提交的事务ID */
CLogXidStatus clogGroupMemberXidStatus; /* CLOG批量提交的事务状态 */
int64 clogGroupMemberPage; /* CLOG批量提交对应的CLOG页面 */
XLogRecPtr clogGroupMemberLsn; /* CLOG批量提交成员的提交回放日志位置 */
#ifdef __aarch64__
/* 以下结构体是为了实现ARM架构下回放日志批量插入 */
bool xlogGroupMember;
pg_atomic_uint32 xlogGroupNext;
XLogRecData* xlogGrouprdata;
XLogRecPtr xlogGroupfpw_lsn;
XLogRecPtr* xlogGroupProcLastRecPtr;
XLogRecPtr* xlogGroupXactLastRecEnd;
void* xlogGroupCurrentTransactionState;
XLogRecPtr* xlogGroupRedoRecPtr;
void* xlogGroupLogwrtResult;
XLogRecPtr xlogGroupReturntRecPtr;
TimeLineID xlogGroupTimeLineID;
bool* xlogGroupDoPageWrites;
bool xlogGroupIsFPW;
uint64 snap_refcnt_bitmap;
#endif

LWLock* subxidsLock;
struct XidCache subxids; /* 子事物XID */

LWLock* backendLock; /* 每个线程的轻量级锁,用于保护以下数据结构的并发访问 */

/* Lock manager data, recording fast-path locks taken by this backend. */
uint64 fpLockBits; /* 快速路径锁的持有模式 */
FastPathTag fpRelId[FP_LOCK_SLOTS_PER_BACKEND]; /* 表对象的槽位 */
bool fpVXIDLock; /* 是否获得本地XID的快速路径锁 */
LocalTransactionId fpLocalTransactionId; /* 本地的XID */
};


图5-15 事务信息

如图5-15所示,proc_base_all_procs以及proc_base_all_xacts为全局的共享区域,每个线程启动的时候会在这个共享区域中注册一个槽位,并且将线程级指针变量t_thrd.proc以及t_thrd.pgxact指向该区域。当该线程有事务开始时,会将对应事务的xmin、xid等信息填写到pgxact结构体中。关键函数及接口如下。
(1) GetOldestXmin:返回当前多版本快照缓存的oldestXmin。(多版本快照机制见后续章节)
(2) ProcArrayAdd:线程启动时在共享区域中注册一个槽位。
(3) ProcArrayRemove:将当前线程从ProcArray数组中移除。
(4) TransactionIdIsInProgress:判断xid是否还在运行之中。
2. 多版本快照机制
因为openGauss使用一段共享内存来实现快照的获取以及各线程事务信息的管理,计算快照持有共享锁以及事务结束持有排他锁有严重的锁争抢问题。为了解决该冲突,openGauss引入了多版本快照机制解决锁冲突。每当事务结束时,持有排他锁、计算快照的一个版本,记录到一个环形缓冲区队列内存里;当别的线程获取快照时,并不持有共享锁去重新计算,而是通过原子操作到该环形队列顶端获取最新快照并将其引用计数加1;待拷贝完了快照信息后,将引用计数减1;当槽位引用计数为0时,表示可以被新的快照复用。
1) 多版本快照数据结构
多版本快照数据结构代码如下:

TransactionId xmin;
TransactionId xmax;
CommitSeqNo snapshotcsn;
TransactionId localxmin;
bool takenDuringRecovery;
ref_cnt_t ref_cnt[NREFCNT]; /* 该快照的引用计数,如果为0则可复用 */
} snapxid_t; /*多版本快照内容,在openGauss CSN方案下,仅需要记录xmin、xmax、snapshotcsn等关键信息即可。*/

static snapxid_t* g_snap_buffer = NULL; /* 缓冲区队列内存区指针 */
static snapxid_t* g_snap_buffer_copy = NULL; /* 缓冲区队列内存的浅拷贝 */
static size_t g_bufsz = 0;
static bool g_snap_assigned = false; /*多版本快照buffer队列是否已初始化 */

#define SNAP_SZ sizeof(snapxid_t) /* 每一个多版本快照的size大小 */
#define MaxNumSnapVersion 64 /* 多版本快照队列的大小,64个版本 */

static volatile snapxid_t* g_snap_current = NULL; /* 当前的快照指针 */
static volatile snapxid_t* g_snap_next = NULL; /* 下一个可用槽位的快照指针 */

2) buffer队列创建流程
在创建共享内存时,根据MaxNumSnapVersion函数的size生成“MaxNumSnapVersion * SNAP_SZ”大小的共享内存区。并将g_snap_current置为0号偏移,g_snap_next置为“1 * SNAP_SZ”偏移。
3) 多版本快照的计算
(1) 获取当前g_snap_next。
(2) 保证当前已持有Proc数组的排他锁,进行xmin、xmax、CSN等关键结构的计算,并存放到g_snap_next中。
(3) 寻找下一个refcount为0可复用的槽位,将g_snap_current赋值为g_snap_next,g_snap_next赋值为可复用的槽位偏移。
4) 多版本快照的获取
(1) 获取g_snap_current指针并将当前快照槽位的引用计数加1,防止并发更新快照时被复用。
(2) 将当前快中的信息拷贝到当前连接的静态快照内存中。
(3) 释放当前多版本快照,并将当前快照槽位的引用计数减1。
5) 关键函数
(1) CreateSharedRingBuffer:创建多版本快照共享内存信息。
(2) GetNextSnapXid:获取下一个多版本快照位置。函数代码如下:

{
return g_snap_buffer ? (snapxid_t*)g_snap_next : NULL;
}

(3) SetNextSnapXid:获取下一个可用的槽位,并且将当前多版本快照最新更新。函数代码如下:

{
if (g_snap_buffer != NULL) {
g_snap_current = g_snap_next; /* 将最新的多版本快照更新到最新。*/
pg_write_barrier(); /* 此处是防止buffer ring初始化时的ARM乱序问题。*/
g_snap_assigned = true;
snapxid_t* ret = (snapxid_t*)g_snap_current;
size_t idx = SNAPXID_INDEX(ret);
loop: /* 主循环,整体思路是不停遍历多版本槽位信息,一直找到一个refcout为0的可重用槽位。*/
do {
++idx;
/* 如果发生回卷,那么重头再找 */
if (idx == g_bufsz)
idx = 0;
ret = SNAPXID_AT(idx);
if (IsZeroRefCount(ret)) {
g_snap_next = ret;
return;
}
} while (ret != g_snap_next);
ereport(WARNING, (errmsg("snapshot ring buffer overflow.")));
/* 当前多版本快照个数为64个,理论上可能是会出现槽位被占满,如果没有空闲槽位,重新遍历即可。 */
goto loop;
}
}

(4) CalculateLocalLatestSnapshot:计算多版本快照信息。函数代码如下:

{
…/* 初始化变量 */

snapxid_t* snapxid = GetNextSnapXid(); /*设置下一个空闲多版本快照槽位信息 */

/* 初始化xmax为 latestCompletedXid + 1 */
xmax = t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid;
TransactionIdAdvance(xmax);

/*并不是每个事务提交都会重新计算xmin和oldestxmin,只有每1000个事务或者每隔1s才会计算,此时xmin及oldestxmin一般偏小,但是不影响可见性判断。 */
currentTimeStamp = GetCurrentTimestamp();
if (forceCalc || ((++snapshotPendingCnt == MAX_PENDING_SNAPSHOT_CNT) ||
(TimestampDifferenceExceeds(snapshotTimeStamp, currentTimeStamp, CALC_SNAPSHOT_TIMEOUT)))) {
snapshotPendingCnt = 0;
snapshotTimeStamp = currentTimeStamp;

/* 初始化xmin */
globalxmin = xmin = xmax;

int* pgprocnos = arrayP->pgprocnos;
int numProcs;

/*
循环遍历proc并计算快照相应值
*/
numProcs = arrayP->numProcs;
/*主要流程,遍历proc_base_all_xacts,将其中pgxact->xid的最小值记为xmin,其中pgxact->xmin的最小值记为oldestxmin。 */
for (index = 0; index vacuumFlags & PROC_IN_LOGICAL_DECODING)
continue;

/* 对于autovacuum的xmin,跳过,避免长VACUUM阻塞脏元组回收 */
if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue;

/* 用最小的xmin来更新globalxmin */
xid = pgxact->xmin;

if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, globalxmin))
globalxmin = xid;

xid = pgxact->xid;

if (!TransactionIdIsNormal(xid))
xid = pgxact->next_xid;

if (!TransactionIdIsNormal(xid) || !TransactionIdPrecedes(xid, xmax))
continue;

if (TransactionIdPrecedes(xid, xmin))
xmin = xid;
}

if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;

t_thrd.xact_cxt.ShmemVariableCache->xmin = xmin;
t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin = globalxmin;
}
/* 此处给多版本快照信息赋值,xmin、oldestxmin因为不是及时计算故可能偏小,xmax、CSN号都是当前的准确值,注意计算快照的时候必须持有排他锁。 */
snapxid->xmin = t_thrd.xact_cxt.ShmemVariableCache->xmin;
snapxid->xmax = xmax;
snapxid->localxmin = t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin;
snapxid->snapshotcsn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo;
snapxid->takenDuringRecovery = RecoveryInProgress();
SetNextSnapXid(); /*设置当前多版本快照 */
}

(5) GetLocalSnapshotData:获取最新的多版本快照供事务使用。函数代码如下:

{
/* 检查是否有多版本快照。在recover启动之前,是没有计算出多版本快照的,此时直接返回。 */
if (!g_snap_assigned || (g_snap_buffer == NULL)) {
ereport(DEBUG1, (errmsg("Falling back to origin GetSnapshotData: not assigned yet or during shutdownn")));
return NULL;
}
pg_read_barrier(); /*为了防止ringBuffer初始化时的ARM乱序问题*/
snapxid_t* snapxid = GetCurrentSnapXid(); /* 将当前的多版本快照refcount++,避免被并发计算新快照的事务重用。 */

snapshot->user_data = snapxid;

… /* 此处将多版本快照snapxid中的信息赋值给快照,注意此处是深拷贝,因为多版本快照仅有几个变量的关键信息,直接赋值即可,之后就可以将相应的多版本快照refcount释放。 */
u_sess->utils_cxt.RecentXmin = snapxid->xmin;
snapshot->xmin = snapxid->xmin;
snapshot->xmax = snapxid->xmax;
snapshot->snapshotcsn = snapxid->snapshotcsn;

ReleaseSnapshotData(snapshot); /* 将多版本快照的refcount释放,以便可以被重用。 */
return snapshot;
}

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论