事务机制源码解析| 锁机制(6)

5.3.6 基于鲲鹏服务器的性能优化

本章着重介绍openGauss基于硬件结构的锁相关的函数及结构体的性能优化。
1. WAL Group inset优化
数据库redo日志缓存系统指的是数据库redo日志持久化的写缓存,数据库redo日志落盘之前会写入到日志缓存中再写到磁盘进行持久化。日志缓存的写入效率是决定数据库整体吞吐量的主要因素,而各个线程之间写日志时为了保证日志顺序写存在锁争抢,锁的争抢就成为了性能的主要瓶颈点。openGauss针对鲲鹏服务器ARM CPU的特点,通过group的方式进行日志的插入,减少锁的争抢,提升WAL日志的插入效率,从而提升整个数据库的吞吐性能。group的方式主要流程如图5-20所示。


图5-20 group的方式日志插入

(1) 不需要所有线程都竞争锁。
(2) 在同一时间窗口所有线程在争抢锁之前先加入到一个group中,第一个加入group的线程作为leader。通过CAS原子操作来实现队列的管理。
(3) leader线程代表整个group去争抢锁。group中的其他线程(follower)开始睡眠,等待leader唤醒。
(4) 争抢到锁后,leader线程将group里的所有线程想要插入的日志遍历一遍得到需要空间总大小。leader线程只执行一次reserve space操作。
(5) leader线程将group中所有线程想要写入的日志都写入到日志缓冲区中。
(6) 释放锁,唤醒所有follower线程。
(7) follower线程由于需要写入的日志已经被leader写入,不需要再争抢锁,直接进入后续流程。
关键函数代码如下:

{ …/* 初始化变量以及简单校验 */ START_CRIT_SECTION(); /* 开启临界区 */ proc->xlogGroupMember = true; … proc->xlogGroupDoPageWrites = &t_thrd.xlog_cxt.doPageWrites; nextidx = pg_atomic_read_u32(&t_thrd.shemem_ptr_cxt.LocalGroupWALInsertLocks[groupnum].l.xlogGroupFirst); while (true) { pg_atomic_write_u32(&proc->xlogGroupNext, nextidx); /* 将上一个成员记录到proc结构体中 */ /* 防止ARM乱序:保证所有前面的写操作都可见 */ pg_write_barrier(); if (pg_atomic_compare_exchange_u32(&t_thrd.shemem_ptr_cxt.LocalGroupWALInsertLocks[groupnum].l.xlogGroupFirst, &nextidx, (uint32)proc->pgprocno)) { break; } /* 这一步原子操作获取上一个成员的proc no,如果是invalid,说明是leader。 */ } /* 非leader成员不去获取WAL Insert锁,仅仅进行等待,直到被leader唤醒 */ if (nextidx != INVALID_PGPROCNO) { int extraWaits = 0; for (;;) { /* 充当读屏障 */ PGSemaphoreLock(&proc->sem, false); /* 充当读屏障 */ pg_memory_barrier(); if (!proc->xlogGroupMember) { break; } extraWaits++; } while (extraWaits-- > 0) { PGSemaphoreUnlock(&proc->sem); } END_CRIT_SECTION(); return proc->xlogGroupReturntRecPtr; } /* leader成员持有锁 */ WALInsertLockAcquire(); /* 计算每个成员线程的xlog record size */ … /* leader线程将所有成员线程的xlog record插入到缓冲区 */ while (nextidx != INVALID_PGPROCNO) { localProc = g_instance.proc_base_all_procs[nextidx]; if (unlikely(localProc->xlogGroupIsFPW)) { nextidx = pg_atomic_read_u32(&localProc->xlogGroupNext); localProc->xlogGroupIsFPW = false; continue; } XLogInsertRecordNolock(localProc->xlogGrouprdata, localProc, XLogBytePosToRecPtr(StartBytePos), XLogBytePosToEndRecPtr( StartBytePos + MAXALIGN(((XLogRecord*)(localProc->xlogGrouprdata->data))->xl_tot_len)), XLogBytePosToRecPtr(PrevBytePos)); PrevBytePos = StartBytePos; StartBytePos += MAXALIGN(((XLogRecord*)(localProc->xlogGrouprdata->data))->xl_tot_len); nextidx = pg_atomic_read_u32(&localProc->xlogGroupNext); } WALInsertLockRelease(); /* 完成工作放锁并唤醒所有成员线程 */ while (wakeidx != INVALID_PGPROCNO) { PGPROC* proc = g_instance.proc_base_all_procs[wakeidx]; wakeidx = pg_atomic_read_u32(&proc->xlogGroupNext); pg_atomic_write_u32(&proc->xlogGroupNext, INVALID_PGPROCNO); proc->xlogGroupMember = false; pg_memory_barrier(); if (proc != t_thrd.proc) { PGSemaphoreUnlock(&proc->sem); } } END_CRIT_SECTION(); return proc->xlogGroupReturntRecPtr; }