最近在生产环境中遇到许多复制相关问题,查阅网上资料发现官方文档虽然系统但是不够有深度,网上部分深度文章则直接以源码展示,不利于大家了解。所以本文则是结合前两者最终给读者以简单的方式展现MongoDB复制的整个架构。本文分为以下5个步骤:
1、MongoDB复制简介
本章节首先会给大家简单介绍一些MongoDB复制的一些基本概念,便于大家对后面内容的理解。
1.1、基本介绍
MongoDB有副本集及主从复制两种模式,今天给大家介绍的是副本集模式,因为主从模式在MongoDB 3.6也彻底废弃不使用了。MongoDB副本集有Primary、Secondary、Arbitrar三种角色。今天给大家介绍的是Primary与Secondary数据同步的内部原理。MongoDB副本集架构如下所示:
1.2、MongoDB Oplog
MongoDB Oplog是MongoDB Primary和Secondary在复制建立期间和建立完成之后的复制介质,就是Primary中所有的写入操作都会记录到MongoDB Oplog中,然后从库会来主库一直拉取Oplog并应用到自己的数据库中。这里的Oplog是MongoDB local数据库的一个集合,它是Capped collection,通俗意思就是它是固定大小,循环使用的。如下图:
MongoDB Oplog中的内容及字段介绍:
{ "ts" : Timestamp(1446011584, 2), "h" : NumberLong("1687359108795812092"), "v" : 2, "op" : "i", "ns" : "test.nosql", "o" : { "_id" : ObjectId("563062c0b085733f34ab4129"), "name" : "mongodb", "score" : "100" } } ts: 操作时间,当前timestamp + 计数器,计数器每秒都被重置 h:操作的全局唯一标识 v:oplog版本信息 op:操作类型 i:插入操作 u:更新操作 d:删除操作 c:执行命令(如createDatabase,dropDatabase) n:空操作,特殊用途 ns:操作针对的集合 o:操作内容,如果是更新操作 o2:操作查询条件,仅update操作包含该字段
1.3、MongoDB复制发展
MongoDB目前已经迭代了很多个版本,下图我汇总了目前市面上常用版本中MongoDB在复制的一些重要改进。
具体细节大家可以参考MongoDB官方Release Note:https://docs.mongodb.com/manual/release-notes/3.6/
2、MongoDB添加从库
2.1、添加从库命令
MongoDB添加从库比较简单,在安装后从库之后,直接在主库执行rs.add()或者replSetReconfig命令即可添加,这两个命令其实在最终都调用replSetReconfig命令执行。大家有兴趣可以去翻阅MongoDB客户端JS代码。
2.2、具体步骤
然后我们来看副本集加一个新从库的大致步骤,如下图,右边的Secondary是我新加的从库。
通过上图我们可以看到一共有7个步骤,下面我们看看每一个步骤MongoDB都做了什么:
注意:
3、 MongoDB复制流程详解
上面我们知道添加一个从库的大致流程,那我们现在来看主从数据同步的具体细节。当从库加入到副本集的时候,会判断自己是需要Initial Syc(全量同步)还是增量同步。那是通过什么条件判断的呢?
3.1、判断全量同步及增量同步
以上三个条件有一个条件满足就需要做全量同步。
我们可以得出在从库最开始加入到副本集的时候,只能先进行Initial Sync,下面我们来看看Initial Sync的具体流程
3.2、全量同步流程(Init sync)
1、 寻找同步源
这里先说明一点,MongoDB默认是采取级联复制的架构,就是默认不一定选择主库作为自己的同步源,如果不想让其进行级联复制,可以通过chainingAllowed参数来进行控制。在级联复制的情况下,你也可以通过replSetSyncFrom命令来指定你想复制的同步源。所以这里说的同步源其实相对于从库来说就是它的主库。那么同步源的选取流程是怎样的呢?
MongoDB从库会在副本集其他节点通过以下条件筛选符合自己的同步源。
- 如果设置了chainingAllowed 为false,那么只能选取主库为同步源
- 找到与自己ping时间最小的并且数据比自己新的节点(在副本集初始化的时候,或者新节点加入副本集的时候,新节点对副本集的其他节点至少ping两次)
- 该同步源与主库最新optime做对比,如果延迟主库超过30s,则不选择该同步源。
- 在第一次的过滤中,首先会淘汰比自己数据还旧的节点。如果第一次没有,那么第二次需要算上这些节点,防止最后没有节点可以做为同步源了。
- 最后确认该节点是否被禁止参与选举,如果是则跳过该节点。
通过上述筛选最后过滤出来的节点作为新的同步源。
其实MongoDB同步源在除了在Initial Sync和增量复制 的时候选定之后呢,并不是一直是稳定的,它可能在以下情况下进行变更同步源:
- ping不通自己的同步源
- 自己的同步源角色发生变化
- 自己的同步源与副本集任意一个节点延迟超过30s
2、 删除MongoDB中除local以外的所有数据库
3、 拉取主库存量数据
这里就到了Initial Sync的核心逻辑了,我下面以图和步骤的方式给大家展现MongoDB在做Initial Sync的具体流程。
同步流程如下:
注:以上步骤直接copy的MongoDB源码中的注释。
以上步骤在Mongo 3.4 Initial Sync 有如下改进:
上述4个新增特性提升了Initial Sync的效率并且提高了Initial Sync的可靠性,所以大家使用MongoDB最好使用最新版本MongoDB 3.4或者3.6,MongoDB 3.6 更是有一些令人兴奋的特性,这里就不在此叙述了。
全量同步完成之后,然后MongoDB会进入到增量同步的流程。
3.3、增量同步流程
上面我们介绍了Initial Sync,就是已经把同步源的存量数据拿过来了,那主库后续写入的数据怎么同步过来呢?下面还是以图跟具体的步骤来给大家介绍:
注:这里不一定是Primary,刚刚提到了同步源也可能是Secondary,这里采用Primary主要方便大家理解。
我们可以看到上述有6个步骤,那每个步骤具体做的事情如下:
上述两个条件满足一个之后,就会将数据给prefetchOps方法处理,prefetchOps方法主要将数据以database级别切分,便于后面多线程写入到数据库中。如果采用的WiredTiger引擎,那这里是以Docment ID 进行切分。
4、 MongoDB高可用
上面我们介绍MongoDB复制的数据同步,我们知道除了数据同步,复制还有一个重要的地方就是高可用,一般的数据库是需要我们自己去定制方案或者采用第三方的开源方案。MongoDB则是自己在内部已经实现了高可用方案。下面我就给大家详细介绍一下MongoDB的高可用。
4.1、触发切换场景
首先我们看那些情况会触发MongoDB执行主从切换。
- 主动执行rs.stepdown 命令
- 主库与大部分节点都无法通信的情况下
- 修改副本集配置的时候(在Mongo 2.6版本会触发,其他版本待确定)修改以下配置的时候:
- _id
- votes
- priotity
- arbiterOnly
- slaveDelay
- hidden
- buildIndexes
4.2、心跳机制
通过上面触发切换的场景,我们了解到MongoDB的心跳信息是MongoDB判断对方是否存活的重要条件,当达到一定的条件时,MongoDB主库或者从库就会触发切换。下面我给大家详细介绍一下心跳机制
我们知道MongoDB副本集所有节点都是相互保持心跳的,然后心跳频率默认是2秒一次,也可以通过heartbeatIntervalMillis来进行控制。在新节点加入进来的时候,副本集中所有的节点需要与新节点建立心跳,那心跳信息具体是什么内容呢?
心跳信息内容:
BSONObjBuilder cmdBuilder;
cmdBuilder.append("replSetHeartbeat", setName);
cmdBuilder.append("v", myCfgVersion);
cmdBuilder.append("pv", 1);
cmdBuilder.append("checkEmpty", checkEmpty);
cmdBuilder.append("from", from);
if (me > -1) {
cmdBuilder.append("fromId", me);
}
注:上述代码摘抄MongoDB 源码中构建心跳信息片段。
具体在MongoDB日志中表现如下:
command admin.$cmd command: replSetHeartbeat { replSetHeartbeat: "shard1", v: 21, pv: 1, checkEmpty: false, from: "10.13.32.244:40011", fromId: 3 } ntoreturn:1 keyUpdates:0
那副本集所有节点默认都是每2秒给其他剩余的节点发送上述信息,在其他节点收到信息后会调用ReplSetCommand命令来处理心跳信息,处理完成会返回如下信息:
result.append("set", theReplSet->name());
MemberState currentState = theReplSet->state();
result.append("state", currentState.s); // 当前节点状态
if (currentState == MemberState::RS_PRIMARY) {
result.appendDate("electionTime", theReplSet->getElectionTime().asDate());
}
result.append("e", theReplSet->iAmElectable()); //是否可以参与选举
result.append("hbmsg", theReplSet->hbmsg());
result.append("time", (long long) time(0));
result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
const Member *syncTarget = replset::BackgroundSync::get()->getSyncTarget();
if (syncTarget) {
result.append("syncingTo", syncTarget->fullName());
}
int v = theReplSet->config().version;
result.append("v", v);
if( v > cmdObj["v"].Int() )
result