连接池——Jedis连接池

2023年 8月 14日 46.6k 0

  在上篇文章 连接池——数据库连接池中,我们了解了连接池的基本概念包括详细的介绍了数据库连接池的实现原理,已经以 HikariCP源码为例,讲解了具体的实现过程,今天我们看下Redis连接池的实现

Jedis连接池

  Redis连接池和数据库连接池一样,也是预先创建和管理一组连接,这样当需要与Redis服务器交互时,就可以直接复用连接。Redis的客户端JedisLettuce都实现了连接池的功能。我们本篇文章先以 Jedis为例,从连接的获取、归还、关闭、创建几个方面详细介绍具体的实现功能。

多线程的使用

  在数据库中如果多个线程复用一个连接会存在数据库事务问题,那么在Redis中我们先来看下在多线程环境下使用一个连接会产生什么问题?

  首先启动两个线程,共同操作同一个 Jedis 实例,每一个线程循环 500 次,分别读取 Key 为 a 和 b 的值

Jedis jedis = new Jedis("127.0.0.1", 6379);
     new Thread(() -> {
         for (int i = 0; i  {
         for (int i = 0; i < 500; i++) {
             String result = jedis.get("b");
             System.out.println(result);
         }
     }).start();

  执行程序多次,可以看到日志中出现了各种奇怪的异常信息,有的未知答复错误,还有的是连接关闭异常等

错误1:redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: 1

错误2:java.io.IOException: Socket Closed

  那我们先来看下 Jedis常用的(3.x)版本的源码

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands,
    ModuleCommands{}
    
 public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {

      protected final Client client;
  
  }
  public class Client extends BinaryClient implements Commands {}
  public class BinaryClient extends Connection {}
  
  public class Connection implements Closeable {
      private Socket socket;
      private RedisOutputStream outputStream;
      private RedisInputStream inputStream;
  }    

  首先Jedis 继承了 BinaryJedisBinaryJedis 中保存了单个 Client 的实例,Client最终继承了 ConnectionConnection 中保存了单个 Socket 的实例以及对应的两个读写流一个 RedisOutputStream 一个是 RedisInputStream

image.png

  BinaryClient 封装了各种 Redis 命令,其最终会调用的是 sendCommand 方法,发现其发送命令时是直接操作 RedisOutputStream 写入字节。

private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();

      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }

  所以在多线程环境下使用 Jedis ,其实就是在复用RedisOutputStream。如果多个线程在执行操作,那么无法保证整条命令是原子写入 Socket。比如,写操作互相干扰,多条命令相互穿插的话,必然不是合法的 Redis 命令也就导致等等各种问题。

  这也说明了Jedis是非线程安全。但是可以通过JedisPool连接池去管理实例,在多线程情况下让每个线程有自己独立的Jedis实例,可变为线程安全。

//使用redis连接池,不会有线程安全问题
private static JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);
     new Thread(() -> {
         try (Jedis jedis = jedisPool.getResource()) {
             for (int i = 0; i  {
         try (Jedis jedis = jedisPool.getResource()) {
             for (int i = 0; i < 1000; i++) {
                 String result = jedis.get("b");
                 System.out.println(result);
             }
         }
     }).start();

连接池的管理

  JedisPool的连接池是基于 Apache Commons PoolGenericObjectPool实现的。我们先来了解下 Apache Commons Pool的实现

Apache Commons Pool

  Apache Commons Pool是一个开源的通用对象池实现,它提供了对象池的基本功能,如对象的创建、销毁、借用和归还等。Apache Commons Pool有如下3个核心的组件,主要负责对象的通用配置、对象的创建、对象池的管理。

GenericObjectPoolConfig

  GenericObjectPoolConfig类是负责通用的对象池配置信息,比如最大对象数,最小空闲数量等。

image.png

  JedisPoolConfig通过继承 GenericObjectPoolConfig,设置了很多个性化的关于空闲连接检测的配置。

public class GenericObjectPoolConfig extends BaseObjectPoolConfig {

    /**
     * 对象池中最大对象数
     * @see GenericObjectPool#getMaxTotal()
     */
    public static final int DEFAULT_MAX_TOTAL = 8;

    /**
     * 对象池中最大空闲对象数
     * @see GenericObjectPool#getMaxIdle()
     */
    public static final int DEFAULT_MAX_IDLE = 8;

    /**
     * 对象池中最小空闲对象数
     * @see GenericObjectPool#getMinIdle()
     */
    public static final int DEFAULT_MIN_IDLE = 0;
    
    }
    
    
public class JedisPoolConfig extends GenericObjectPoolConfig {
  public JedisPoolConfig() {
    //空闲时是否进行对象有效性检查
    setTestWhileIdle(true);
    //连接空闲的最小时间
    setMinEvictableIdleTimeMillis(60000);
    //“空闲链接”检测线程,检测的周期,毫秒数
    setTimeBetweenEvictionRunsMillis(30000);
    //对所有连接做空闲监测
    setNumTestsPerEvictionRun(-1);
  }
} 

PooledObjectFactory

   PooledObjectFactory这个对象工厂主要负责对象的创建与销毁,它是一个接口,JedisFactory实现对应的接口功能。

image.png

public interface PooledObjectFactory {
	//"激活"对象
    void activateObject(PooledObject var1) throws Exception;
	//销毁对象
    void destroyObject(PooledObject var1) throws Exception;

    default void destroyObject(PooledObject p, DestroyMode destroyMode) throws Exception {
        this.destroyObject(p);
    }
	//创建一个新对象	
    PooledObject makeObject() throws Exception;
    // "钝化"对象,
    void passivateObject(PooledObject var1) throws Exception;
	//检测对象是否"有效"
    boolean validateObject(PooledObject var1);
}

GenericObjectPool

   GenericObjectPool主要是负责操作对象池里面的对象,从对象池获取对象、归还对象等操作。而GenericObjectPool通过持有上面的PooledObjectFactory对象工厂,然后去操作对应的对象。

image.png

public interface ObjectPool extends Closeable {
    
    //从池中获取对象
    T borrowObject() throws Exception, NoSuchElementException,
            IllegalStateException;
            
    //清除池,池可用
    void clear() throws Exception, UnsupportedOperationException;

    //关闭池,池不可用
    @Override
    void close();
    
    //将对象放回池中
    void returnObject(T obj) throws Exception;

}

小结

  以上就是Apache Commons Pool的具体的核心组件与功能,接下来我们看下JedisPool连接池如何基于它去实现具体的功能的。

获取连接

  我们使用JedisPool时候,是使用getResource()方法去获取Jedis,如下代码

Jedis jedis = jedisPool.getResource()

  我们先看下源码,getResource()最终实际调用的还是 GenericObjectPool对象池里面的borrowObject方法。

//Pool#getResource
 public T getResource() {
    try {
      return internalPool.borrowObject();
    } catch (NoSuchElementException nse) {
      if (null == nse.getCause()) { 
        //异常是连接池耗尽导致的
        throw new JedisExhaustedPoolException(
            "Could not get a resource since the pool is exhausted", nse);
      }
      //异常是 activateObject() or ValidateObject()导致的
      throw new JedisException("Could not get a resource from the pool", nse);
    } catch (Exception e) {
      throw new JedisConnectionException("Could not get a resource from the pool", e);
    }
  }

  我们先看下整体的流程,实际做的事情就是从空闲队列获取对象,没有的话就去创建对象信息,然后激活对象实例,再校验对象的合法性,最后返回对应的一个对象实例。

image.png

  接下来我们再看下具体的源码,GenericObjectPool对象池里面的borrowObject方法实现

//GenericObjectPool#borrowObject
private final LinkedBlockingDeque idleObjects;
 
public T borrowObject(final Duration borrowMaxWaitDuration) throws Exception {
   //检查对象池状态,看看是否已经被关闭了
    assertOpen();
    //清除废弃的对象
    final AbandonedConfig ac = this.abandonedConfig;
    if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle()  getMaxTotal() - 3)) {
        removeAbandoned(ac);
    }

    PooledObject p = null;
    final boolean blockWhenExhausted = getBlockWhenExhausted();

    boolean create;
    final long waitTimeMillis = System.currentTimeMillis();

    while (p == null) {
        create = false;
        //从LinkedBlockingDeque队列中拿出第一个元素
        p = idleObjects.pollFirst();
        if (p == null) {
            //创建对象
            p = create();
            if (p != null) {
                    //创建成功,创建标识置为true
                create = true;
            }
        }
        if (blockWhenExhausted) {
          //上面没有创建成功
            if (p == null) {
                    //如果maxWaitDuration设置的为负数
                if (borrowMaxWaitDuration.isNegative()) {
                    // 从空闲队列获取,但是该方法会阻塞,一直等到有可用空闲对象。
                    p = idleObjects.takeFirst();
                } else {
                    // 如果设置了一个有效的等待时间,最多等待borrowMaxWaitMillis毫秒。还取不到就返回空
                    p = idleObjects.pollFirst(borrowMaxWaitDuration);
                }
            }
            if (p == null) {
                throw new NoSuchElementException(appendStats(
                        "Timeout waiting for idle object, borrowMaxWaitDuration=" + borrowMaxWaitDuration));
            }
        } else if (p == null) {
            throw new NoSuchElementException(appendStats("Pool exhausted"));
        }
        // 如果分配失败(可认为被别人抢走了),p置为空(可以进行下一次循环遍历)
        if (!p.allocate()) {
            p = null;
        }
        if (p != null) {
            try {
            //通过对象池工厂,激活这个对象
            //jedis连接池的实现是JedisFactory,做了一个redis的select连库请求
                factory.activateObject(p);
            } catch (final Exception e) {
                try {
                 // 如果激活对象时,发生了异常,销毁对象
                    destroy(p, DestroyMode.NORMAL);
                } catch (final Exception e1) {}
                p = null;
                if (create) {
                    final NoSuchElementException nsee = new NoSuchElementException(
                            appendStats("Unable to activate object"));
                    nsee.initCause(e);
                    throw nsee;
                }
            }

            if (p != null && getTestOnBorrow()) {
                boolean validate = false;
                Throwable validationThrowable = null;
                try {
                //激活成功,开始校验对象。jedis的实现是,发一条redis的ping命令来校验连接的有效性
                    validate = factory.validateObject(p);
                } catch (final Throwable t) {
                    PoolUtils.checkRethrow(t);
                    validationThrowable = t;
                }
                if (!validate) {
                    try {
                    //校验对象失败,开始销毁对象
                        destroy(p, DestroyMode.NORMAL);
                        destroyedByBorrowValidationCount.incrementAndGet();
                    } catch (final Exception e) {
                        // Ignore - validation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                appendStats("Unable to validate object"));
                        nsee.initCause(validationThrowable);
                        throw nsee;
                    }
                }
            }
        }
    }
            //更新对象池统计信息
    updateStatsBorrow(p, Duration.ofMillis(System.currentTimeMillis() - waitTimeMillis));
            //返回对象实例
    return p.getObject();
}

归还连接

  接着我们看下怎么去归还连接,主要流程其实就是把连接加入到空闲队列。
image.png
  连接归还是由Jedis里面的close()方法去触发的,实际调用的还是GenericObjectPool类里面的returnObject(),我们主要看下这个方法

//Jedis#close
public void close() {
    if (dataSource != null) {
      JedisPoolAbstract pool = this.dataSource;
      this.dataSource = null;
      if (isBroken()) {
        pool.returnBrokenResource(this);
      } else {
        pool.returnResource(this);
      }
    } else {
      super.close();
    }
  }
//GenericObjectPool#returnObject
public void returnObject(final T obj) {
    //从ConcurrentHashMap中获取原始对象的PooledObject对象
    final PooledObject p = getPooledObject(obj);
    //如果p为空,说明这个要还的对象,已经不在池子中了
    if (p == null) {
        if (!isAbandonedConfig()) {
            throw new IllegalStateException(
                    "Returned object not currently part of this pool");
        }
        return;
    }
    //使用同步锁,标记返回对象的状态
    markReturningState(p);
    //获取对象使用时间
    final Duration activeTime = p.getActiveDuration();
    //如果testOnReturn配置为true,需要校验有效性
    if (getTestOnReturn() && !factory.validateObject(p)) {
        try {
            //如果校验不通过,则销毁该对象
            destroy(p, DestroyMode.NORMAL);
        } catch (final Exception e) {
            swallowException(e);
        }
        try {
            ensureIdle(1, false);
        } catch (final Exception e) {
            swallowException(e);
        }
        updateStatsReturn(activeTime);
        return;
    }
  //钝化对象,也就是反初始化,也就是释放核心资源,JedisFactory里面是什么都没有实现的
    try {
        factory.passivateObject(p);
    } catch (final Exception e1) {
        swallowException(e1);
        try {
            destroy(p, DestroyMode.NORMAL);
        } catch (final Exception e) {
            swallowException(e);
        }
        try {
            ensureIdle(1, false);
        } catch (final Exception e) {
            swallowException(e);
        }
        updateStatsReturn(activeTime);
        return;
    }
    //变更状态为 IDLE
    if (!p.deallocate()) {
        throw new IllegalStateException(
                "Object has already been returned to this pool or is invalid");
    }
    //获取对象池配置的最大空闲对象数量
    final int maxIdleSave = getMaxIdle();
    //目前空闲对象数量已经达到规定的最大值,直接销毁对象
    if (isClosed() || maxIdleSave > -1 && maxIdleSave  localMaxTotal) {
                 
                 createCount.decrementAndGet();
                 //当前没有创建的对象数量,则无需创建对象
                 if (makeObjectCount == 0) {
                     create = Boolean.FALSE;
                 } else {
                     //否则等待对象的返回
                     makeObjectCountLock.wait(localMaxWaitTimeMillis);
                 }
             } else {
                 // 对象池未达到容量。创建新对象
                 makeObjectCount++;
                 create = Boolean.TRUE;
             }
         }

         //超过了最大等待时间
         if (create == null &&
             (localMaxWaitTimeMillis > 0 &&
              System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
             create = Boolean.FALSE;
         }
     }

     if (!create.booleanValue()) {
         return null;
     }

     final PooledObject p;
     try {
         //创建对象调用的是JedisFactory中实现的
         p = factory.makeObject();
          //如果testOnReturn配置为true,需要校验有效性
         if (getTestOnCreate() && !factory.validateObject(p)) {
             //不合法减少创建数量,返回空
             createCount.decrementAndGet();
             return null;
         }
     } catch (final Throwable e) {
          //创建失败,减少创建数量,抛出异常
         createCount.decrementAndGet();
         throw e;
     } finally {
         //释放锁,通知其他的等待线程
         synchronized (makeObjectCountLock) {
             makeObjectCount--;
             makeObjectCountLock.notifyAll();
         }
     }
	//清除废弃的对象配置
     final AbandonedConfig ac = this.abandonedConfig;
     if (ac != null && ac.getLogAbandoned()) {
         p.setLogAbandoned(true);
         p.setRequireFullStackTrace(ac.getRequireFullStackTrace());
     }
	 //增加createdCount数量
     createdCount.incrementAndGet();
     //新的对象创建好了,需要把他添加到池子里,allObjects用的一个ConcurrentHashMap
     allObjects.put(new IdentityWrapper(p.getObject()), p);
     return p;
 }

  JedisFactory中实现的创建对象方法,实际上就是创建一个 Jedis实例

//JedisFactory#makeObject
public PooledObject makeObject() throws Exception {
    Jedis jedis = null;
    try {
        //创建redis连接
        jedis = new Jedis(jedisSocketFactory, clientConfig);
        jedis.connect();
        return new DefaultPooledObject(jedis);
    } catch (JedisException je) {
        if (jedis != null) {
            try {
                jedis.close();
            } catch (RuntimeException e) {
                logger.debug("Error while close", e);
            }
        }
        throw je;
    }
}

连接池的配置

  接下来我们看下常用的配置参数与建议。

参数 说明 默认值 建议
maxTotal 资源池中的最大连接数 8
maxIdle 资源池允许的最大空闲连接数 8
minIdle 资源池确保的最少空闲连接数 0
blockWhenExhausted 当资源池用尽后,调用者是否要等待。只有当值为true时,下面的maxWaitMillis才会生效。 true 建议默认值。
maxWaitMillis 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)。 -1(永不超时) 不建议默认值。
testOnBorrow 向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。 false 业务量很大建议设置为false,减少一次ping的开销。
testOnReturn 向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。 false 业务量很大建议设置为false,减少一次ping的开销。
jmxEnabled 是否开启JMX监控 true 建议开启,请注意应用本身也需要开启
testWhileIdle 是否开启空闲资源检测。 false true
timeBetweenEvictionRunsMillis 空闲资源的检测周期(单位为毫秒) -1(不检测) 建议设置,周期自行选择
minEvictableIdleTimeMillis 资源池中资源的最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除。 30分钟 可根据自身业务决定,一般默认值即可
numTestsPerEvictionRun 做空闲资源检测时,每次检测资源的个数。 3 可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。

  其中最主要的就是最大连接数(maxTotal)。可以先根据下面的公式估算,实际情况还是根据业务总QPS和调用Redis的客户端规模整体评估每个节点所使用的连接池大小。

最大连接数=平均命令执行耗时(S)∗业务的QPS最大连接数 = 平均命令执行耗时(S) * 业务的QPS最大连接数=平均命令执行耗时(S)∗业务的QPS

  假如redis命令平均耗时约为1ms,业务期望的QPS是10000,那么理论上需要的连接数大小是
0.001/10000=100.001 / 10000 = 100.001/10000=10

总结

  JedisPool的连接池是基于 Apache Commons Pool 的 GenericObjectPool实现的,相对数据库连接池HikariCP实现起来更加简单,大家也可以使用Apache Commons Pool去实现其他的连接池技术,比如FTP连接池等等

参考

  • redis/jedis: Redis Java 客户端

  • Apache Commons Pool 2.11.1 API

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论