redigo连接池的源码分析

2023年 9月 26日 63.4k 0

redigo连接池的源码分析

今天我们来看一看redigo(github.com/gomodule/re…)是如何实现连接池的。

概述

连接池部分的代码在redis/pool.go中,相关结构体和接口的UML图如下图所示

redigo中的连接池的源码分析-uml.png

Pool结构体定义了连接池的属性和行为,包括以下主要参数:

  • Dial func() (Conn, error):指向用于新建连接的函数,由redigo的用户指定
  • MaxIdle int:最大空闲连接数
  • MaxActive int:连接池的容量,即连接池中最多可以包含多少个连接,包括正在使用的连接和空闲连接
  • IdleTimeout time.Duration:空闲连接的最大空闲时间
  • Get() Conn:从连接池获取连接

另外,idleList是一个由空闲连接(类型为*poolConn)构成的双向链表。pushFront()popFront()popBack()这3个函数分别用于,通过将刚刚使用过的连接插入到链表头部来将其放回连接池;从链表头部取出空闲连接;从链表尾部删除长时间没有使用的空闲连接。

type idleList struct {
	count       int
	front, back *poolConn
}

实现连接池时,需要考虑以下几个问题

  • 何时新建连接?

    • 若新建连接时发现已创建的连接数到达连接池的容量上限,该如何处理?
  • 如何回收空闲时间过长的连接?

  • 如何确保连接池中的连接依然存活?

下面就带着这几个问题,重点梳理一下从连接池中获取连接的func (p *Pool) Get() Conn方法和将连接放回连接池的func (ac *activeConn) Close()方法。

问题1:如何回收空闲时间过长的连接?

先来梳理func (p *Pool) Get() Conn方法的逻辑。

func (p *Pool) Get() Conn {
	// GetContext returns errorConn in the first argument when an error occurs.
	c, _ := p.GetContext(context.Background())
	return c
}

func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
	// Wait until there is a vacant connection in the pool.
	waited, err := p.waitVacantConn(ctx)
	if err != nil {
		return errorConn{err}, err
	}
  // ...

Get()会返回两种类型的连接,activeConnerrorConn,这两种类型都实现了Conn接口。

这里采用了称为Null Object或Special Case的设计模式,即使获取连接时发生错误,也不会产生nil,而是返回一个异常的连接。只不过在异常连接上的绝大多数操作都会返回错误。这样设计的好处一是避免了空指针异常,二是延后了错误处理的时机,或者说减少了一处需要检查错误的位置,redigo的用户可以认为Get()总会返回“有效的”连接,而在错误检查时,只需重点检查Do()等方法的返回值。

Get()调用了GetContext(),而后者又调用了waitVacantConn()waitVacantConn()有两条执行路径,我们先来看最简单的一条——若没有开启等待模式p.Wait == false或者没有设置最大连接数(连接池的容量),就直接返回。p.Wait == true时的逻辑将在后面介绍。

func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {
	if !p.Wait || p.MaxActive  0 {
		// ...
	}

	// Prune stale connections at the back of the idle list.
	if p.IdleTimeout > 0 {
		n := p.idle.count
		for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
			pc := p.idle.back	// ①
			p.idle.popBack()
			p.mu.Unlock()
			pc.c.Close()
			p.mu.Lock()
			p.active--
		}
	}

这部分代码回答了有关连接池的一个问题——如何回收空闲时间过长的连接?

redigo的实现方法是获取连接时顺带回收空闲时间过长的连接。①p.idle.back(类型为*poolConn)是指向空闲连接的双向链表尾部的指针,所指向的空闲连接的t字段记录了该连接最后一次使用的时间。如果t加上连接池参数p.IdleTimeout(最大空闲时间)在当前时间nowFunc()之前(类比食品的保质期在当前时间之前),就从双向链表p.idle中删除该连接后关闭。

由于这部分代码可能会被多个goroutine并发执行,所以在回收(=从链表中删除)空闲连接时,以及p.active计数器--时,都需要通过p.mu.Lock()加锁。redigo在这里还尽可能缩小了锁的范围:

p.mu.Lock()
// for ...
  p.mu.Unlock()
  pc.c.Close()
  p.mu.Lock()
	// ...
// }

问题2:如何确保连接池中的连接依然存活?

回收完空闲时间过长的连接后,就可以遍历空闲连接的链表,从中获取可用的空闲连接了。这部分代码同样可能会被多个goroutine并发执行,所以依然需要互斥锁p.mu的保护。

p.mu.Lock()
for p.idle.front != nil {
	pc := p.idle.front
	p.idle.popFront()
	p.mu.Unlock()
  // return an `activeConn` or check next idle connection
 	// ...
}

activeConn的结构如下

type activeConn struct {
	p     *Pool
	pc    *poolConn
	state int
}

之所以要确保空闲连接依然存活,是因为空闲连接虽然存在,但可能已经是失效的连接了。那么什么时候会出现这种情况呢?

在Redis的配置中,有一项叫做timeout,默认为0

# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0

如果该选项的值不为0,且小于redigo连接池的配置项MaxIdle的值会发生什么呢?我们不妨测试一下

$ fgrep timeout -B2 /usr/local/etc/redis.conf

# Close the connection after a client is idle for N seconds (0 to disable)
timeout 5
--

$ # 重启redis 
$ # brew services restart redis
func main() {
	pool := &redis.Pool{
		MaxActive: 1,
		MaxIdle:   1,
		Dial: func() (redis.Conn, error) {
			return redis.Dial("tcp", "127.0.0.1:6379")
		},
	}

	c := pool.Get()
	reply, err := c.Do("PING")
	if err != nil {
		fmt.Println(reply, err)
	}
	c.Close() // return to pool

	time.Sleep(20 * time.Second)
	c = pool.Get()
	reply, err = c.Do("PING")
	if err != nil {
		fmt.Println(reply, err)	//  EOF
	}
}

通过Wireshark抓包,就很容易解释为什么第二次c.Do("PING")报错了,

redis-timeout-tcpdump.png

可以看到在9.40秒时,Redis关闭了与客户端之间的TCP连接。而在23.54秒左右(相对于第一次PING时的3.53秒,经历了20秒,就是time.Sleep(20 * time.Second)睡眠的时间),redigo在已关闭的空闲连接上发送PING,Redis直接通过RST标志断开了连接。

这就是空闲连接虽然存在,但已经失效的情况。

为了避免这种情况,我们不但可以根据Redis的timeout的配置,调整连接池IdleTimeout time.Duration的值,还可以在创建连接池时指定TestOnBorrow函数,例如

//  pool := &redis.Pool{
//    // Other pool configuration not shown in this example.
//    TestOnBorrow: func(c redis.Conn, t time.Time) error {
//      if time.Since(t) < time.Minute {
//        return nil
//      }
//      _, err := c.Do("PING")
//      return err
//    },
//  }

if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
			// ...
			return &activeConn{p: p, pc: pc}, nil
		}
		pc.c.Close() // ①
		p.mu.Lock()
		p.active--

可以看到,当p.TestOnBorrow检测失败时,①空闲连接就会因无效而被关闭,避免了后续在已被Redis关闭的TCP连接上发送请求的问题。

问题3:新建连接的问题

如果空闲连接的链表为空,或者链表中没有存活着的可用连接,就不得不新建连接了。

新建连接很简单,只需要调用dial()函数,

p.mu.Lock()
// ...
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
// ...
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil

dial()的实现如下,仅仅是调用了创建连接池时指定的新建连接的(Dial成员指向的)函数

func (p *Pool) dial(ctx context.Context) (Conn, error) {
	// ...
	if p.Dial != nil {
		return p.Dial()
	}
	// ...
}

但新建时需要考虑,当已创建的连接数已达到连接池的容量上限时要如何处理。

我们先来看redigo中最简单的一种处理方法,

// Handle limit for p.Wait == false.
	if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
		p.mu.Unlock()
		return errorConn{ErrPoolExhausted}, ErrPoolExhausted
	}

此时,p.Wait == false,且已创建的连接数达到了连接池的容量上限(p.active >= p.MaxActive),于是redigo直接返回了表示错误的连接return errorConn{}

p.Wait == true时的处理方式稍微复杂一些,简单来说就是,当已创建的连接数达到了连接池的容量上限时,通过Pool结构体上的ch

type Pool struct {
  // ...
	ch           chan struct{} // limits open connections when p.Wait is true

让获取连接的goroutine进入等待状态。

	select {
case

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论