源码版本:pg 14.3
源文件:src/backend/replication/syncrep.c
原文地址:https://www.mytecdb.com/blogDetail.php?id=239
1、PG同步复制简介
同步复制是 pg 9.1 版本引入的新特性,事务提交必须等待事务对应的 lsn 在同步的备库接收到之后,才能提交成功。同步复制的实现逻辑主要集中在主节点,主节点记录了哪些备节点是同步节点,需要等待事务 lsn 在这些同步的备节点上接收、刷盘甚至应用完成。
pg 同步复制的几种模式:
#define SYNC_REP_NO_WAIT (-1) // 异步复制,不需等待备节点
#define SYNC_REP_WAIT_WRITE 0 // 同步复制,等待备节点写wal日志
#define SYNC_REP_WAIT_FLUSH 1 // 同步复制,等待备节点写wal日志并刷盘
#define SYNC_REP_WAIT_APPLY 2 // 同步复制,等待备节点写wal日志并应用
commit 类型的 wal record 提供 apply 反馈,其他类型只提供 write/flush 反馈。pg 的同步复制只在 commit 阶段才会等待,因为只有 commit 类型才会影响主备事务的一致性。
2、函数调用关系
主节点 backend 进程提交事务,其函数调用关系如下:
CommitTransaction()
RecordTransactionCommit()
SyncRepWaitForLSN(),等待某个 lsn 在备库操作完成。
WaitLatch()
从 SyncRepWaitForLSN() 函数的调用看,pg 的同步复制,只在 commit 阶段才会等待备库确认,其他非 commit 类型的 wal 同步,不需要等待备库确认。在 SyncRepWaitForLSN() 函数内调用 SyncRepQueueInsert() 把当前 backend 进程放入 WalSndCtl->SyncRepQueue 等待队列,休眠等待备节点接收到 wal 日志、刷盘或者应用。
wal sender 进程负责将 wal 日志发送给备节点,根据备节点返回的消息,来唤醒相关的 backend 等待进程,其函数调用关系如下:
ProcessRepliesIfAny() // walsender.c,处理备节点发送的 reply 信息。
ProcessStandbyMessage()
ProcessStandbyReplyMessage()
SyncRepReleaseWaiters()
SyncRepWakeQueue(),唤醒等待的 backend 进程
SetLatch()
SyncRepWakeQueue() 函数中会判断 WalSndCtl->lsn 值是否小于等待队列 WalSndCtl->SyncRepQueue 里面的 backend 进程 proc->waitLSN 值,如果小于,则该 backend 进程需要继续等待,否则把该 backend 进程从等待队列里面删除。
proc->waitLSN 在事务提交时获取。WalSndCtl->lsn 在 walsender 进程接收到备节点的回复消息时获取,实际上就是 WalSndCtl->walsnds 变量保存的 lsn 信息,该变量是通过读取备库发送的网络回复消息获取到的 lsn,其实现逻辑位于 ProcessStandbyReplyMessage() 函数,相关代码如下:
static void
ProcessStandbyReplyMessage(void)
{
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
replyTime = pq_getmsgint64(&reply_message);
replyRequested = pq_getmsgbyte(&reply_message);
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->write = writePtr;
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
SpinLockRelease(&walsnd->mutex);
}
3、相关的全局变量
WalSndCtlData *WalSndCtl = NULL;
WalSndCtl 位于共享内存,用于记录主备流复制进程相关的状态信息,包括等待某个 lsn 的 backend 进程队列信息以及 walsender 进程的信息等,它是一个全局共享的变量。
SyncRepConfigData *SyncRepConfig = NULL;
SyncRepConfig 记录 GUC 参数 synchronous_standby_names 配置的相关信息,其结构如下:
typedef struct SyncRepConfigData
{
int config_size; /* total size of this struct, in bytes */
int num_sync; /* number of sync standbys that we need to
* wait for */
uint8 syncrep_method; /* method to choose sync standbys */
int nmembers; /* number of members in the following list */
/* member_names contains nmembers consecutive nul-terminated C strings */
char member_names[FLEXIBLE_ARRAY_MEMBER];
} SyncRepConfigData;
GUC 参数 synchronous_standby_names 配置的值会被转成 SyncRepConfig 变量,在 pg 内核中有一个专门的语法文件 syncrep_gram.y 用于解析该参数,该参数的配置示例如下:
synchronous_standby_names = 'FIRST 2 (s1, s2, s3)'
synchronous_standby_names = 'ANY 2 (s1, s2, s3)'