为MySQL MGR实现简单的负载均衡代理

2023年 8月 15日 29.8k 0

在多写(多节点写入)数据库(例如MySQL MGR的multi-primary mode)与应用之间,往往会加一层代理组件,通过算法调节不同节点负载,分发高并发读写请求。

要求代理工具需要具有请求转发、负载均衡、故障转移的功能。

在后端节点故障发生或者连接因为客户端异常、网络问题断开时,需要及时将故障节点及时踢出负载均衡队列或者关闭异常连接,做到故障转移。

这就是接下来介绍的主要内容,使用golang简单编写一个这样的工具,来深入学习一下负载均衡代理的实现。

1、功能一览

负载均衡

将应用端的连接请求(负载)按照既定的均衡算法转发到不同的后端节点,服务程序建立应用(客户端)与数据库节点之间的通信并保持至客户端断开连接。

故障转移

在后端节点出现故障时,能及时的检测到故障,并将故障节点踢出负载均衡队列,不再将应用请求路由到故障节点,做到应用无感知。在故障恢复后,能够检测到节点状态恢复,将其再次加入到负载均衡队列。

2、实现细节

核心功能

请求转发

代理需要做到将请求分发到不同的后端节点上去,并保持应用与对应节点的通信,直至其中一端退出(故障或者主动)。

负载均衡

对应用的负载,均衡的分发的不同的节点,需要对应的算法支持。目前通用的负载均衡算法有随机、轮询、加权轮询,代码实现了这三种算法。

此外还有动态判断后端节点负载情况,根据负载情况动态调整负载分发,这需要额外的负载监控工作,这里没有实现。

故障检测

负载均衡代理需要避免向失效的节点分发请求。故障类型无疑是很多的,如果面面俱到的对每个故障类型都照顾到,无疑增加了实现难度。

例如在分布式中,不可靠的网络增加了检测故障难度,对于数据库实例,在分布式中很难判断节点到底是crash了还是网络中断导致的。

并且节点因为负载较高无法及时响应请求,这时也是很难判断节点状态,此时进行重试可能会加剧节点的负载。

在这里并不是要模糊这种判断,而是实际情况实在是太复杂了,我并不是相关领域专家,所以在实现故障检测时,只考虑了几种确定性较高或者容易判断的情况。

过程实现

其中, 转发 实现过程是在接收到请求后,定义一个后端节点的地址,并建立一个和这个地址的连接。

在开启两个协程,一个负责将应用(客户端)发送的数据包传递给后端的连接,另一个是将后端的返回的数据传递给应用,这样就在应用与后端节点之间搭建起了通信,使之像直接通信一样交换数据,核心的步骤可以参考下面代码的实现。

sConn, err := l.Accept()
dTcpAddr, _ := net.ResolveTCPAddr("tcp4", addr)
dConn, err := net.DialTCP("tcp", nil, dTcpAddr)
go io.Copy(sConn, dConn)
go io.Copy(dConn, sConn)

在出现连接完成既定通信后断开连接,或者连接因为故障退出,需要代理将客户端的请求连接与转发向后端的连接一同关闭。

这里使用的方式是获取连接传输数据时的状态来判断,即io.Copy(sConn, dConn)在出现错误时,连接就可以关闭了。这里借助channel的阻塞特性来向主线程通知退出。所以对上述的。

go io.Copy(sConn, dConn)
go io.Copy(dConn, sConn)

代码进行修改后如下:

        // channel长度为1,任意时刻只写入一个bool值,在其中的值未被读取之前,处于阻塞状态 
exitCH := make(chan bool, 1)
        // 把客户端的的请求转发给后端 
go func(s net.Conn, d *net.TCPConn, ex chan bool) {  
_, err := io.Copy(sConn, dConn)  
if err != nil {
   Error.Println("Send data failure: ", err)  
}  
exitCH <- true 
}(sConn, dConn, exitCH)
        // 把响应的数据返回给客户端 
go func(s net.Conn, d *net.TCPConn, ex chan bool) {
_, err := io.Copy(dConn, sConn)  
if err != nil {
   Error.Println("Receive data failure: ", err)  
}
  exitCH <- true 
}(sConn, dConn, exitCH)
        // channel阻塞,读取连接关闭状态 
<-exitCH 
// channel收到信息(连接终止)后,关闭连接 
_ = dConn.Close()

负载均衡 算法的实现则是在每次向后端建立连接的时候,这个后端地址是根据算法的不同,返回一个负载均衡算法推荐的后端节点的地址,然后使用这个地址建立一个连接,并与应用搭建起通信(正如上一步骤介绍的那样)。

其中随机算法较为简单,核心是随机数的获取,使用这个随机数作为下标在负载均衡队列中拿到具体的节点:

type Random struct { 
CurIndex int 
Nodes    []*node.Node
}

func (r *Random) Next() *node.Node { 
if len(r.Nodes) == 0 {  
return nil 

r.CurIndex = rand.Intn(len(r.Nodes)) 
return r.Nodes[r.CurIndex]
}

轮询算法则是每次获取后端节点信息是采取的逐个查询的方式获取需要分发请求的节点:

type RoundRobin struct { 
CurIndex int 
Nodes    []*node.Node
}

func (r *RoundRobin) Next() *node.Node { 
if len(r.Nodes) == 0 {  
return nil 

l := len(r.Nodes) 
if r.CurIndex >= l {  
r.CurIndex = 0 

currAddr := r.Nodes[r.CurIndex] 
r.CurIndex = (r.CurIndex + 1) % l 
return currAddr
}

加权轮询算法实现上相对复杂一些,为每个后端节点增加权重属性,包含三个权重属性:权重(Weight)、临时权重(CurWeight)、有效权重(EffectWeight)。

其中CurWeight、EffectWeight初始值为0,Weight值则读取配置文件设定来初始化。CurWeight每轮都会变化,EffectWeight默认与Weight相同。

实现逻辑

1、currentWeight = currentWeight + effecitveWeight
2、选中最大的 currentWeight 节点为选中节点
3、currentWeight = currentWeight - totalWeight

type WeightRoundRobin struct { 
Nodes []*node.Node
}

func (r *WeightRoundRobin) Next() *node.Node { 
var n *node.Node 
total := 0 
for i := 0; i < len(r.Nodes); i++ {  
w := r.Nodes[i]  
total += w.EffectWeight  
w.CurWeight += w.EffectWeight  
if w.EffectWeight < w.Weight {   
w.EffectWeight++  
}  
if n == nil || w.CurWeight > n.CurWeight {   
n = w  


if n == nil {  
return nil 

n.CurWeight -= total 
return n
}

故障检测 是保证负载均衡队列中的节点是可以正常访问并且提供可靠服务的前提,在检测到后端节点存在故障后,需要及时的从队列中剔除,并关闭与之对应的连接。

检测在实际实现上使用了两种基本方法。一种是基本的连通性检测,一种是利用MGR或者GreatDB提供的内部视图来判断节点是否可写。

这种在MGR中从当前节点查询本节点状态可能并不准确例如:发生网络分区,从当前节点查看状态为ONLINE,但从其他节点查看,则当前可能为ERROR状态,代码并未考虑这个情况。

后续可增加对一个节点可写状态判断需要与其他节点的状态查询综合考虑。

连通性检测:

_, err := net.DialTimeout("tcp", addr, time.Duration(dialtimeout)*time.Millisecond)

这里是借助命令行工具实现可写检测,没有使用开源的连接驱动,主要是考虑代码的简洁。

可写检测:

var CMD = "mysql"
func State(detectSql string, user string, pass string, port string, host string, cluster string) (bool, error) {
  ok, _ := CommandOk(CMD) 
  if ok {
    sqlComLine := CMD + " -u" + user + " -p" + pass + " -h" + host + " -P" + port + " -NBe '"  
    if cluster == "greatdb" {
       sqlComLine += detectSql + " WHERE HOST=" + "\"" + host + "\"" + "'"  
      } else if cluster == "mgr" {
       sqlComLine += detectSql + " WHERE MEMBER_HOST=" + "\"" + host + "\"" + "'"  
      }  
      cmd := exec.Command("bash", "-c", sqlComLine)  
      out, err := cmd.CombinedOutput()  
      rest := strings.Replace(string(out), "\n", "", -1)  
      if err == nil {
         if rest == "ONLINE" {
             return true, nil   
            } else {
                return false, errors.New("instance is exists but cannot write")   
            }  
           }
             return false, err 
            } else {  
             return false, errors.New("cannot detect instance state") 
            }
           }
           
           func CommandOk(c string) (bool, error) {
            command := "which " + c 
            cmd := exec.Command("bash", "-c", command) 
            out, err := cmd.CombinedOutput() 
            if err == nil {
              context := strings.Fields(strings.Replace(string(out), "\n", "", -1))  
              if len(context) > 2 {
                 if context[1] == "no" {    
                 return false, nil   
                 }  
                }  
               return true, nil 
              } 
             return false, err
            }

在检测到后端节点连通性有问题或者节点状态为不可写,需要将节点踢出负载均衡队列,这里通过加锁来防止并发操作队列引入新的代码错误。

然后通过channel通知主线程负载均衡队列发生了变化,需要更新。其次是通知主线程需要将各个协程在处理的与故障节点有关的连接,需要关闭。

func DelNode(n *node.Node) {
 for i := 0; i < len(nodeList); i++ {
   if nodeList[i].Ip == n.Ip && nodeList[i].Port == n.Port {
      mu.Lock()   
      nodeList = append(nodeList[:i], nodeList[i+1:]...)   
      listChange <- 1   
      connClose <- n.Ip   
      mu.Unlock()   
      Error.Println("The destination address is removed from the load balance list :", net.JoinHostPort(n.Ip, strconv.Itoa(n.Port)))  
    } 
  }
}

在心跳检测到后端节点可写状态恢复,则需要将其再次加入到负载均衡队列,新的连接会根据负载均衡算法的平衡,路由到恢复的节点上,也就是会再次分发请求到正常节点。

    // 在队列中不存在,则添加    
    if exists == false {
            mu.Lock()        
            defer mu.Unlock()        
            nodeList = append(nodeList, n)        
            ch <- 1 
  Info.Println("The destination address is added to the load balance list :", addr)    
}

3、使用问题

程序启动

目前只在CentOS 7.6上进行了简单测试,测试了后端节点被kill、机器reboot、连接异常断开等故障情况

cd easy-proxy<br>g
o build main/easyproxy<br>

修改配置,增加后端节点、端口、权重等
如果需要快速故障转移,可以配置ticktime和dialtimeout参数,单位是毫秒。

nohup ./easyproxy --cnf=conf/easy.conf &

可能问题

在使用过程可能会遇到

accept tcp [::]:3310: accept4: too many open files

或者

dial tcp 127.0.0.0:3310 socket: too many files

这是系统文件描述符的数量不够用了,解决方法是可以增加文件描述符的数量

ulimit -n 1024000

修改文件描述符后,重新启动进程,查看进程最大打开文件数:Max open files

cat /proc/18659/limits 

......
Max open files            1024000              1024000              files      
......

一点想法

后续可考虑对程序增加守护进程,保障程序一定程度的可用性,代理工具无状态,也可以进行扩展来实现HA。

这里只是简单的实现了一下请求代理和负载均衡,通过编码加深对负载均衡的理解不失为一个有效方法,测试并不充分。

代码约600行左右,没有通过DB Driver连接数据库,而是借助命令行来操作,后续会继续完善。希望能带来一些对负载均衡的思考。

源码地址:https://gitee.com/huajiashe_byte/easy-proxy

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论