作者简介:高鹏,笔名八怪。《深入理解MySQL主从原理》图书作者,同时运营个人公众号“MySQL学习”,持续分享遇到的有趣case以及代码解析!
源码版本 5.7.29 半同步部分一共分为4节,为2021年春节前夕学习,时间短主要用于后期确认问题。也希望对象熟悉半同步的朋友有所帮助。如果有误请谅解。
一、内部结构
1、全局变量
- ReplSemiSyncMaster repl_semisync:半同步插件结构,全局变量。
- Ack_receiver ack_receiver:ack_receiver全局变量。
2、ReplSemiSyncBase类
继承自Trace类 主要元素为
static const unsigned char kSyncHeader[2]; /* three byte packet header */
/* Constants in network packet header. */
static const unsigned char kPacketMagicNum;
static const unsigned char kPacketFlagSync;
其主要作用:
- 主库回调函数repl_semi_before_send_event判断是否是事务的最后一个event,如果是则会设置kPacketFlagSync为1,说明本event需要从库进行ack反馈
- 从库回调函数repl_semi_slave_queue_event通过这个标记来判断是否需要进行ack反馈
3、ReplSemiSyncMaster
继承自ReplSemiSyncBase类
ActiveTranx *active_tranxs_; 链表和hash结构用于记录当前活跃的事务
bool init_done_; 主库半同步插件是否初始化
mysql_mutex_t LOCK_binlog_; 由于ReplSemiSyncBase为全局结构体,用户线程/dump线程/ack receiver线程都需要修改他,修改需要加锁
commit_file_name_inited_/commit_file_name_/commit_file_pos_
reply_file_name_inited_/reply_file_name_/reply_file_pos_
wait_file_name_inited_/wait_file_name_/wait_file_pos_
分表代表
- 主库当前最大的提交点的binlog位点,以及进行了第一次修改
- 从库ACK的最大binlog位点,以及进行了第一次修改
- 主库中等待Ack_receiver唤醒最小的那个事务的位点 ,以及进行了第一次修改
bool state_; 非常重要的状态,用于判断是否可以不等待ACK直接变为异步,由参数
rpl_semi_sync_master_wait_for_slave_count和rpl_semi_sync_master_wait_no_slave共同控制
unsigned long wait_timeout_; 超时设置
volatile bool master_enabled_; 是否主库设置了enable参数
AckContainer ack_container_; ACK反馈信息的容器
4、AckContainer
继承自Trace类,主要元素如下
/* The greatest ack of the acks already reported to semisync master. */
AckInfo m_greatest_ack;
AckInfo *m_ack_array;
/* size of the array */
unsigned int m_size;
/* index of an empty slot, it helps improving insert speed. */
unsigned int m_empty_slot;
其中ackinfo也比较简单就是ack的信息如下:
int server_id;//从库的server id
char binlog_name[FN_REFLEN]; //ack的binlog
unsigned long long binlog_pos;//ack的位点
5、ActiveTranx
当前半同步活跃事务,是一个链表结构,且包含一个hash结构,按照事务前后排序,最前面的最早。当ack反馈会自动摘除参考回调函数repl_semi_report_binlog_sync。
TranxNodeAllocator allocator_;
/* These two record the active transaction list in sort order. */
TranxNode *trx_front_, *trx_rear_;
TranxNode **trx_htb_; /* A hash table on active transactions. */
int num_entries_; /* maximum hash table entries */
mysql_mutex_t *lock_; /* mutex lock */
其中的TranxNode 结构如下:
char log_name_[FN_REFLEN];
my_off_t log_pos_;//位点信息
mysql_cond_t cond;//这个条件变量为等待ACK的条件变量
int n_waiters;//等待次数
struct TranxNode *next_; /* the next node in the sorted list */
struct TranxNode *hash_next_; /* the next node during hash collision */
6、TranxNodeAllocator
TranxNode的分配器,用于提高性能,一次性分配16个TranxNode 叫做一个block。分配的时候会依次初始化每个TranxNode,包括其中的条件变量。下面看看block的定义:
struct Block {
Block *next;
TranxNode nodes[BLOCK_TRANX_NODES];
};
分配可参考函数TranxNodeAllocator::allocate_block
7、Poll_socket_listener
半同步的连接文件描述符注册到这个类中
8、Ack_receiver
私有,提供更改接口
enum status {ST_UP, ST_DOWN, ST_STOPPING};
uint8 m_status;//ack receiver线程状态,上面3个状态
/*
Protect m_status, m_slaves_changed and m_slaves. ack thread and other session may access the variables at the same time.
*/
mysql_mutex_t m_mutex; //mutex保护后面信息的更改
mysql_cond_t m_cond;
/* If slave list is updated(add or remove). */
bool m_slaves_changed;
Slave_vector m_slaves; //这是一个dump线程的vector数组
my_thread_handle m_pid;
其中Slave_vector 定义如下:
typedef std::vector Slave_vector;
struct Slave
{
uint32_t thread_id;
Vio vio;
uint server_id;
bool net_compress;
my_socket sock_fd() const { return vio.mysql_socket.fd; }
};
二、主库端回调函数
Trans_observer trans_observer = {
sizeof(Trans_observer), // len
repl_semi_report_before_dml, //before_dml 空
repl_semi_report_before_commit, // before_commit 空
repl_semi_report_before_rollback, // before_rollback 空
repl_semi_report_commit, // after_commit OK
repl_semi_report_rollback, // after_rollback
};
Binlog_storage_observer storage_observer = {
sizeof(Binlog_storage_observer), // len
repl_semi_report_binlog_update, // report_update OK
repl_semi_report_binlog_sync, // after_sync OK
};
Binlog_transmit_observer transmit_observer = {
sizeof(Binlog_transmit_observer), // len
repl_semi_binlog_dump_start, // start OK
repl_semi_binlog_dump_end, // stop OK
repl_semi_reserve_header, // reserve_header OK
repl_semi_before_send_event, // before_send_event OK
repl_semi_after_send_event, // after_send_event OK
repl_semi_reset_master, // reset OK
};
三、主库插件初始化
执行语句 INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';
则进行主库插件的初始化
#0 semi_sync_master_plugin_init (p=0x6bca6e0) at /home/mysql/soft/percona-server-5.7.29-32/plugin/semisync/semisync_master_plugin.cc:591
#1 0x00000000014e7cf0 in plugin_initialize (plugin=0x6bca6e0) at /home/mysql/soft/percona-server-5.7.29-32/sql/sql_plugin.cc:1279
#2 0x00000000014ea354 in mysql_install_plugin (thd=0x7ffdec000b90, name=0x7ffdec006678, dl=0x7ffdec006688) at /home/mysql/soft/percona-server-5.7.29-32/sql/sql_plugin.cc:2279
#3 0x00000000014f0547 in Sql_cmd_install_plugin::execute (this=0x7ffdec006670, thd=0x7ffdec000b90) at /home/mysql/soft/percona-server-5.7.29-32/sql/sql_plugin.cc:4664
#4 0x00000000014c0054 in mysql_execute_command (thd=0x7ffdec000b90, first_level=true) at /home/mysql/soft/percona-server-5.7.29-32/sql/sql_parse.cc:5154
#5 0x00000000014c2025 in mysql_parse (thd=0x7ffdec000b90, parser_state=0x7fffe81524a0, update_userstat=false) at /home/mysql/soft/percona-server-5.7.29-32/sql/sql_parse.cc:5927
#6 0x00000000014b6c5f in dispatch_command (thd=0x7ffdec000b90, com_data=0x7fffe8152c90, command=COM_QUERY) at /home/mysql/soft/percona-server-5.7.29-32/sql/sql_parse.cc:1539
#7 0x00000000014b5a94 in do_command (thd=0x7ffdec000b90) at /home/mysql/soft/percona-server-5.7.29-32/sql/sql_parse.cc:1060
#8 0x00000000015e9d32 in handle_connection (arg=0x6ffa140) at /home/mysql/soft/percona-server-5.7.29-32/sql/conn_handler/connection_handler_per_thread.cc:325
#9 0x00000000018b97f2 in pfs_spawn_thread (arg=0x6fd8f10) at /home/mysql/soft/percona-server-5.7.29-32/storage/perfschema/pfs.cc:2198
#10 0x00007ffff7bc6ea5 in start_thread () from /lib64/libpthread.so.0
#11 0x00007ffff5f2b8dd in clone () from /lib64/libc.so.6
调用方式:
->ReplSemiSyncMaster::initObject
->设置init_done_为ture,表示经过了初始化
->setWaitTimeout(rpl_semi_sync_master_timeout) 设置timeout参数
->setTraceLevel(rpl_semi_sync_master_trace_level) 设置跟踪级别
->setWaitSlaveCount(rpl_semi_sync_master_wait_for_slave_count) 设置rpl_semi_sync_master_wait_for_slave_count参数
->result= ack_container_.resize(new_value, &ackinfo);
这里的new_value就是参数的指定,如下一段debug值,为设置为2的时候
AckContainer::resize (this=0x7fffd47ff7d0 <repl_semisync+1744>, size=2, ackinfo=0x7fffec10c168),根据AckContainer最小位点就能够知道,可以提交的最小位点。
->根据rpl_semi_sync_master_enabled参数设置,是否进行初始化包含
1、加锁修改lock();
2、初始化ReplSemiSyncMaster的变量
commit_file_name_inited_/reply_file_name_inited_/wait_file_name_inited_为false
3、master_enabled_设置为true
4、设置值state_,根据参数rpl_semi_sync_master_wait_for_slave_count
和rpl_semi_sync_master_wait_no_slave进行设置,如下逻辑
state_ = (rpl_semi_sync_master_wait_no_slave != 0 ||
(rpl_semi_sync_master_clients >=
rpl_semi_sync_master_wait_for_slave_count));
1、如果
rpl_semi_sync_master_wait_no_slave默认为true,并且rpl_semi_sync_master_wait_for_slave_count=1
那么这里state_返回必然为TRUE
2、如果
rpl_semi_sync_master_wait_no_slave设置为false,并且rpl_semi_sync_master_wait_for_slave_count=2
那么条件1位false 条件2在半同步从库数量小于rpl_semi_sync_master_wait_for_slave_count的时候 也将返回为FALSE,那么state_设置为FALSE
3、如果
rpl_semi_sync_master_wait_no_slave设置为false,并且rpl_semi_sync_master_wait_for_slave_count=2
那么条件1位false 条件2在半同步从库数量大于等于rpl_semi_sync_master_wait_for_slave_count的时候
将返回为TRUE,那么state_设置为TRUE
也就是说rpl_semi_sync_master_wait_no_slave为变量state_开启了可以判断的可能。
5、解锁unlock();
->ack_receiver.init()
->setTraceLevel(rpl_semi_sync_master_trace_level); 根据参数rpl_semi_sync_master_trace_level
设置trace level
->如果rpl_semi_sync_master_enabled参数未true则启动ack_receiver线程
->Ack_receiver::start
如果为ST_DOWN没有启动则设置为ST_UP状态mysql_thread_create进行启动
回调函数为ack_receive_handler,输入的值为 *this,也就是本Ack_receiver
ack_receive_handler 建立新线程Ack_receiver
reinterpret_cast<Ack_receiver *>(arg)->run(); 这里进行了回调,调用
Ack_receiver::run,启动ack_receive线程,启动后面具体介绍
->register_trans_observer 注册事务类型插件,注册进入观察者链表
->register_binlog_storage_observer
->register_binlog_transmit_observer
四、Ack_receiver线程初始化
Ack_receiver::run 建立新线程Ack_receiver
修改m_slaves_changed为ture,更改线程信息
进入循环如果m_status == ST_STOPPING,处于ST_STOPPING,则函数结束,线程结束
ACK receiver进入状态stage_waiting_for_semi_sync_ack_from_slave(ACK RECEIVER无法显示状态show processlist无法看到)
如果m_slaves_changed为ture
如果m_slaves.empty() 也就是没有slave连接
Ack_receiver::wait_for_slave_connection() 如果没有半同步从库连接则ack receiver等待在这里
等待dump线程回调函数注册semi半同步从库
->进入状态stage_waiting_for_semi_sync_slave(ACK RECEIVER无法显示)
Socket_listener.init_slave_sockets,
->循环每个连接的连接的semi从库到Poll_socket_listener类的m_fds中
m_slaves_changed设置为false
listen_on_sockets 进入poll等待设置超时时间为1秒,如果返回<=0(没有数据到来,或者错误)
进行判断是否报错Failed to wait on semi-sync dump sockets
循环每个socket文件描述符读取数据
->ReplSemiSyncMaster::reportReplyPacket
判断包的头部是否包含magic ReplSemiSyncBase::kPacketMagicNum = 0xef
报错:Read semi-sync reply magic number error
判断REPLY_BINLOG_POS_OFFSET 8字节
报错:Read semi-sync reply length error: packet is too small
判断REPLY_BINLOG_NAME_OFFSET 文件名不能大于512字节
报错:Read semi-sync reply binlog file length too large
->handleAck处理ack信息(这里非常重要将等待一个从库ACK和多个从库ACK的情况分开了)
!!如果rpl_semi_sync_master_wait_for_slave_count等于1,直接可以进入下面流程了
->ReplSemiSyncMaster::reportReplyBinlog
判断master_enabled_是否设置了,没有设置直接goto end
1、判断state_状态是否为false,如果是false可能半同步已经关闭了,这里检查是否可以开启
->try_switch_on 是否需要设置state_为true
当前从库收到的的binlog filename/pos已经是当前主库最大的commit_file_name_和commit_file_pos_
->ActiveTranx::compare当前从库的binlog filename/pos和当前主库最大的binlog filename和pos
semi_sync_on=true
如果没有获取到最大的binlog filename和pos
semi_sync_on=true
如果semi_sync_on=true
state_ = true 则说明最新的主库的binlog已经落盘需要开启半同步了。
2、判断是否需要修改reply_file_name_/reply_file_pos_
这里比较的本次当前从库收到的的binlog filename/pos和上次事务收到的
reply_file_name_/reply_file_pos_,如果大于说明有新的事务从库已经落盘了。
如果trace_level_设置为 16则会输出日志
Got reply at log_file_name,log_file_pos
3、如果有正在等待处理的session,也就是提交流程需要继续的
if (rpl_semi_sync_master_wait_sessions > 0)
进行判断是否大于了wait_file_name_/wait_file_pos_(提交事务中最小的需要等待位点)
如果大于则说明至少可以唤醒一个线程继续提交了
设置can_release_threads=true
如果can_release_threads设置为true,则进行唤醒,期间如果trace_level_设置为 16则会输出日志
"signal all waiting threads"
进行唤醒
active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
->循环所有当前需要ACK的事务,及trx_front_
比较每个注册到ack recevier的active_tranxs_事务,是否等待的pos是否达到了。
ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos)
如果可以唤醒本事务 小于等于0 说明事务的pos小于当前收到的log logfile 可以唤醒了
mysql_cond_broadcast(&entry->cond);//做唤醒操作
!!如果rpl_semi_sync_master_wait_for_slave_count大于1了,那么需要借助ack_container_进行统计
因此ack_container_的元素个数是根据参数rpl_semi_sync_master_wait_for_slave_count进行初始化的
因此我们只要判定是否其中的元素最小位点即可知道可以唤醒的事务。
AckInfo* AckContainer::insert
此函数会返回当前ack_container_中共同反馈的最小位点,并且有remove_all逻辑然后进入
->ReplSemiSyncMaster::reportReplyBinlog
这个流程和上面一样了
Ack_receiver的唤醒由dump线程进行,如果没有半同步的slave,Ack_receiver线程会一直处于等待Ack_receiver::wait_for_slave_connection() ,当Ack_receiver::add_slave函数加入半同步从库后开始工作。
五、半同步插件调用方式
举例:
(RUN_HOOK(binlog_transmit, transmit_start,
(thd, m_flag, m_start_file, m_start_pos,
&m_observe_transmission)))//运行半同步插件
转换后为:
binlog_transmit_delegate->transmit_start(thd, m_flag, m_start_file, m_start_pos,&m_observe_transmission)
调用:
FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
迭代每个观察者,回调插件函数最终调用repl_semi_binlog_dump_start函数。