- 源码版本:PG 13.3
- 源码文件:slot.c slotfuncs.c
1. 什么是 PG 复制槽?
PG 复制槽用于记录主备流复制的状态,主要目的是防止 wal 日志被过早的删除,导致备库流复制中断。复制槽是有状态的,能够持久化到磁盘上,允许宕机、重启场景下进行恢复。在有复制槽的场景下,即使备库关闭很长时间,主库也会为其保留足够的 wal 日志,直到备库恢复接收完这些 wal 日志,主库才会将其删除。当然这也带来了新的问题,即如果备库永远不恢复,那么主库的 wal 日志就会永远保留,导致磁盘空间耗尽,这时需要人工介入处理。
2. PG 复制槽相关参数
- max_replication_slots,最大的复制槽数量,取值范围为 0 ~ 0x3FFFF,默认值为 10, 设置为 0 表示禁用复制槽。
- wal_level >= replica
3. 复制槽共享内存初始化
复制槽相关的数据结构存储在共享内存中,大小由 max_replication_slots 参数决定,每个复制槽由结构体 ReplicationSlot 表示,其成员变量 in_use 表示该复制槽元素是否正在被使用。
- ReplicationSlotsShmemSize(),计算复制槽共享内存大小
- ReplicationSlotsShmemInit(),初始化共享内存
4. PG 复制槽的实现逻辑
4.1 创建复制槽
创建复制槽的函数调用关系:
pg_create_physical_replication_slot()
create_physical_replication_slot()
ReplicationSlotCreate()
ReplicationSlotCreate() 函数实现了创建复制槽的功能,主要入参是复制槽名称,函数内部调用 ReplicationSlotValidateName() 函数对复制槽名称进行合法性检查。遍历共享内存中的 ReplicationSlot 数组,找到一个 in_use 为 0 的元素,对该元素的各成员进行初始化。遍历 ReplicationSlot 数组时,也会对复制槽名称是否出现同名进行检查,如果已有相同名称的复制槽则报错。最后调用 CreateSlotOnDisk() 函数,将复制槽的数据写入数据目录下的文件中,路径为 pg_replslot/$slot_name/state,文件内容为 ReplicationSlotOnDisk 结构体。
4.2 启动复制槽
在主库创建的复制槽需要在主备流复制启动后才能使用,备库发送 start replication 时可以指定复制槽名称来启动复制槽,函数调用
PostgresMain()
exec_replication_command()
StartReplication()
WalSndLoop()
5. 导出函数和视图
slotfuncs.c 源文件中定义了一些导出函数和视图给用户调用,用于操作复制槽。
函数:
pg_create_physical_replication_slot()
pg_create_logical_replication_slot()
pg_drop_replication_slot()
pg_get_replication_slots()
pg_replication_slot_advance()
视图:
pg_replication_slots
6. 复制槽相关问题
6.1 复制槽 dirty 表示什么?
MyReplicationSlot->just_dirtied = true;
MyReplicationSlot->dirty = true;
复制槽数据在共享内存与磁盘上都会进行存储,当共享内存中的数据发生变化,则会将 MyReplicationSlot->dirty 标记为 true,刷盘后标记为 false。相关代码可参见 ReplicationSlotPersist() 函数。
6.2 为什么复制槽能够阻止 wal 日志被清除?
ReplicationSlotsComputeRequiredLSN() 函数遍历所有复制槽,获取最小的 restart_lsn,将该值赋值给 XLogCtl->replicationSlotMinLSN。wal 日志的清除主要在 checkpoint 操作时进行,在创建 checkpoint 时,会调用 KeepLogSeg() 函数,依赖 XLogCtl->replicationSlotMinLSN 保留 wal 日志文件。
此外,ReplicationSlotsComputeRequiredXmin() 函数遍历所有复制槽,获取最小的 effective_xmin 和 effective_catalog_xmin,然后分别将其赋值给如下两个变量,这两个变量在事务快照等场景下会使用。
procArray->replication_slot_xmin
procArray->replication_slot_catalog_xmin
6.3 逻辑复制槽与物理复制槽的区别
逻辑复制槽与物理复制槽在源码上的区别是 slot->data.database 是不是为 0 。
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
6.4 复制槽的 xid 和 lsn 如何更新?
在 wal sender 进程中接收备库发出的回馈信息,包含 feedbackXmin 和 feedbackCatalogXmin,这些信息被存储在 slot 的成员变量中,如下:
slot->data.xmin = feedbackXmin;
slot->effective_xmin = feedbackXmin;
slot->data.catalog_xmin = feedbackCatalogXmin;
slot->effective_catalog_xmin = feedbackCatalogXmin;
函数调用关系如下:
WalSndLoop()
ProcessRepliesIfAny()
ProcessStandbyMessage()
ProcessStandbyHSFeedbackMessage()
PhysicalReplicationSlotNewXmin()
lsn 的更新与逻辑复制有关,wal sender 进程在逻辑解码时,更新相关的 lsn,如下:
slot->candidate_catalog_xmin = xmin;
slot->candidate_xmin_lsn = current_lsn;
slot->candidate_restart_valid = current_lsn;
slot->candidate_restart_lsn = restart_lsn;
函数调用关系如下:
WalSndLoop()
XLogSendLogical()
LogicalDecodingProcessRecord()
DecodeStandbyOp()
SnapBuildProcessRunningXacts()
LogicalIncreaseXminForSlot()
LogicalIncreaseRestartDecodingForSlot()