分库分表(4)——ShardingJDBC原理和源码分析

2023年 10月 13日 33.3k 0

一、原理

1.执行流程

image.png

ShardingJDBC的整体框架如图所示,主要分为以下5个阶段。

(1)SQL Parser: SQL解析引擎

解析过程分为词法解析和语法解析。 词法解析器用于将SQL拆解为不可再分的原子符号,称为Token。并根据不同数据库方言所提供的字典,将其归类为关键字,表达式,字面量和操作符。 再使用语法解析器将SQL转换为抽象语法树(简称AST, Abstract Syntax Tree)。

(2)SQL Router- SQL 路由引擎

根据解析上下文匹配数据库和表的分片策略,并生成路由路径。对于携带分片键的 SQL,根据分片键的不同可以划分为单片路由(分片键的操作符是等号)、多片路由(分片键的操作符是 IN)和范围路由(分片键的操作符是 BETWEEN)。不携带分片键的 SQL 则采用广播路由。

分片策略通常可以采用由数据库内置或由用户方配置。数据库内置的方案较为简单,内置的分片策略大致可分为尾数取模、哈希、范围、标签、时间等。由用户方配置的分片策略则更加灵活,可以根据使用方需求定制复合分片策略。

(3)SQL Rewriter : SQL 优化引擎

首先,在数据方言方面。Apache ShardingSphere 提供了 SQL 方言翻译的能力,能否实现数据库方言之间的自动转换。例如,用户可以使用 MySQL 客户端连接 ShardingSphere 并发送基于 MySQL 方言的 SQL,ShardingSphere 能自动识别用户协议与存储节点类型自动完成 SQL 方言转换,访问 PostgreSQL 等异构存储节点。

(4)SQL Executor : SQL执行引擎

ShardingSphere 采用一套自动化的执行引擎,负责将路由和改写完成之后的真实 SQL 安全且高效发送到底层数据源执行。它不是简单地将 SQL 通过 JDBC 直接发送至数据源执行;也并非直接将执行请求放入线程池去并发执行。它更关注平衡数据源连接创建以及内存占用所产生的消耗,以及最大限度地合理利用并发等问题。执行引擎的目标是自动化的平衡资源控制与执行效率。

(5)Result Merger: 结果归并

将从各个数据节点获取的多数据结果集,组合成为一个结果集并正确的返回至请求客户端,称为结果归并。

2.主要概念

(1)分片键

用于分片的数据库字段,是将数据库(表)水平拆分的关键字段。例:将订单表中的订单主键的尾数取模分片,则订单主键为分片字段。 SQL中如果无分片字段,将执行全路由,性能较差。 除了对单分片字段的支持,ShardingSphere也支持根据多个字段进行分片。

(2)分片算法

通过分片算法将数据分片,支持通过=、BETWEEN和IN分片。

目前提供4种分片算法。由于分片算法和业务实现紧密相关,因此并未提供内置分片算法,而是通过分片策略将各种场景提炼出来,提供更高层级的抽象,并提供接口让应用开发者自行实现分片算法。

  • 精确分片算法

对应PreciseShardingAlgorithm,用于处理使用单一键作为分片键的=与IN进行分片的场景。需要配合StandardShardingStrategy使用。

  • 范围分片算法

对应RangeShardingAlgorithm,用于处理使用单一键作为分片键的BETWEEN AND进行分片的场景。需要配合StandardShardingStrategy使用。

  • 复合分片算法

对应ComplexKeysShardingAlgorithm,用于处理使用多键作为分片键进行分片的场景,包含多个分片键的逻辑较复杂,需要应用开发者自行处理其中的复杂度。需要配合ComplexShardingStrategy使用。

  • Hint分片算法

对应HintShardingAlgorithm,用于处理使用Hint行分片的场景。需要配合HintShardingStrategy使用。

(3)分片策略

包含分片键和分片算法,由于分片算法的独立性,将其独立抽离。真正可用于分片操作的是分片键 + 分片算法,也就是分片策略。目前提供5种分片策略。

  • 标准分片策略

对应StandardShardingStrategy。提供对SQL语句中的=, IN和BETWEEN AND的分片操作支持。StandardShardingStrategy只支持单分片键,提供PreciseShardingAlgorithm和RangeShardingAlgorithm两个分片算法。PreciseShardingAlgorithm是必选的,用于处理=和IN的分片。RangeShardingAlgorithm是可选的,用于处理BETWEEN AND分片,如果不配置RangeShardingAlgorithm,SQL中的BETWEEN AND将按照全库路由处理。

  • 复合分片策略

对应ComplexShardingStrategy。复合分片策略。提供对SQL语句中的=, IN和BETWEEN AND的分片操作支持。ComplexShardingStrategy支持多分片键,由于多分片键之间的关系复杂,因此并未进行过多的封装,而是直接将分片键值组合以及分片操作符透传至分片算法,完全由应用开发者实现,提供最大的灵活度。

  • 行表达式分片策略

对应InlineShardingStrategy。使用Groovy的表达式,提供对SQL语句中的=和IN的分片操作支持,只支持单分片键。对于简单的分片算法,可以通过简单的配置使用,从而避免繁琐的Java代码开发,如: t_user_$->{u_id % 8} 表示t_user表根据u_id模8,而分成8张表,表名称为t_user_0到t_user_7。

  • Hint分片策略

对应HintShardingStrategy。通过Hint而非SQL解析的方式分片的策略。

  • 不分片策略

对应NoneShardingStrategy。不分片的策略。

二、源码

ShardingJDBC的本质其实是生成一个带有分库分表功能的ShardingSphereDatasource。他就是我们经常使用的DataSource接口的实现类。因此,我们也可以尝试构建一个JDBC的应用,作为源码阅读的起点。

import com.zaxxer.hikari.HikariDataSource;
import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.strategy.keygen.KeyGenerateStrategyConfiguration;
import org.apache.shardingsphere.sharding.api.config.strategy.sharding.StandardShardingStrategyConfiguration;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author yangnk
 * @desc
 * @date 2023/10/12 08:51
 **/
public class ShardingJDBCDemo {
    public static void main(String[] args) throws SQLException {
        //=======一、配置数据库
        Map dataSourceMap = new HashMap(2);//为两个数据库的datasource
        // 配置第一个数据源
        HikariDataSource dataSource0 = new HikariDataSource();
        dataSource0.setDriverClassName("com.mysql.cj.jdbc.Driver");
        dataSource0.setJdbcUrl("jdbc:mysql://localhost:3306/coursedb?serverTimezone=GMT%2B8&useSSL=false");
        dataSource0.setUsername("root");
        dataSource0.setPassword("root");
        dataSourceMap.put("m0", dataSource0);
        // 配置第二个数据源
        HikariDataSource dataSource1 = new HikariDataSource();
        dataSource1.setDriverClassName("com.mysql.cj.jdbc.Driver");
        dataSource1.setJdbcUrl("jdbc:mysql://localhost:3306/coursedb2?serverTimezone=GMT%2B8&useSSL=false");
        dataSource1.setUsername("root");
        dataSource1.setPassword("root");
        dataSourceMap.put("m1", dataSource1);
        //=======二、配置分库分表策略
        ShardingRuleConfiguration shardingRuleConfig = createRuleConfig();
        //三、配置属性值
        Properties properties = new Properties();
        //打开日志输出 4.x版本是sql.show,5.x版本变成了sql-show
        properties.setProperty("sql-show", "true");
        //K1 创建ShardingSphere的数据源 ShardingDataSource
        DataSource dataSource = ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, Collections.singleton(shardingRuleConfig), properties);


        //-------------测试部分-----------------//
        ShardingJDBCDemo test = new ShardingJDBCDemo();
        //建表
//        test.droptable(dataSource);
//        test.createtable(dataSource);
        //插入数据
//        test.addcourse(dataSource);
        //K1 调试的起点 查询数据
        test.querycourse(dataSource);
    }

    private static ShardingRuleConfiguration createRuleConfig(){
        ShardingRuleConfiguration result = new ShardingRuleConfiguration();
        //spring.shardingsphere.rules.sharding.tables.course.actual-data-nodes=m$->{0..1}.course_$->{1..2}
        ShardingTableRuleConfiguration courseTableRuleConfig = new ShardingTableRuleConfiguration("course",
                "m$->{0..1}.course_$->{1..2}");
        //spring.shardingsphere.rules.sharding.key-generators.alg_snowflake.type=SNOWFLAKE
        //spring.shardingsphere.rules.sharding.key-generators.alg_snowflake.props.worker.id=1
        Properties snowflakeprop = new Properties();
        snowflakeprop.setProperty("worker.id", "123");
        result.getKeyGenerators().put("alg_snowflake", new AlgorithmConfiguration("SNOWFLAKE", snowflakeprop));
        //spring.shardingsphere.rules.sharding.tables.course.key-generate-strategy.column=cid
        //spring.shardingsphere.rules.sharding.tables.course.key-generate-strategy.key-generator-name=alg_snowflake
        courseTableRuleConfig.setKeyGenerateStrategy(new KeyGenerateStrategyConfiguration("cid","alg_snowflake"));
        //spring.shardingsphere.rules.sharding.tables.course.database-strategy.standard.sharding-column=cid
        //spring.shardingsphere.rules.sharding.tables.course.database-strategy.standard.sharding-algorithm-name=course_db_alg
        courseTableRuleConfig.setDatabaseShardingStrategy(new StandardShardingStrategyConfiguration("cid","course_db_alg"));
        //spring.shardingsphere.rules.sharding.sharding-algorithms.course_db_alg.type=MOD
        //spring.shardingsphere.rules.sharding.sharding-algorithms.course_db_alg.props.sharding-count=2
        Properties modProp = new Properties();
        modProp.put("sharding-count",2);
        result.getShardingAlgorithms().put("course_db_alg",new AlgorithmConfiguration("MOD",modProp));
        //spring.shardingsphere.rules.sharding.tables.course.table-strategy.standard.sharding-column=cid
        //spring.shardingsphere.rules.sharding.tables.course.table-strategy.standard.sharding-algorithm-name=course_tbl_alg
        courseTableRuleConfig.setTableShardingStrategy(new StandardShardingStrategyConfiguration("cid","course_tbl_alg"));
        //#spring.shardingsphere.rules.sharding.sharding-algorithms.course_tbl_alg.type=INLINE
        //#spring.shardingsphere.rules.sharding.sharding-algorithms.course_tbl_alg.props.algorithm-expression=course_$->{cid%2+1}
        Properties inlineProp = new Properties();
        inlineProp.setProperty("algorithm-expression", "course_$->{((cid+1)%4).intdiv(2)+1}");
        result.getShardingAlgorithms().put("course_tbl_alg",new AlgorithmConfiguration("INLINE",inlineProp));

        result.getTables().add(courseTableRuleConfig);
        return result;
    }

    //添加10条课程记录
    public void addcourse(DataSource dataSource) throws SQLException {
        for (int i = 1; i < 10; i++) {
            long orderId = executeAndGetGeneratedKey(dataSource, "INSERT INTO course (cname, user_id, cstatus) VALUES ('java'," + i + ", '1')");
            System.out.println("添加课程成功,课程ID:" + orderId);
        }
    }

    public void querycourse(DataSource dataSource) throws SQLException {
        Connection conn = null;
        try {
            //ShardingConnectioin
            conn = dataSource.getConnection();
            //ShardingStatement
            Statement statement = conn.createStatement();
            String sql = "SELECT cid,cname,user_id,cstatus from course where cid=851198093910081536";
            //ShardingResultSet
            ResultSet result = statement.executeQuery(sql);
            while (result.next()) {
                System.out.println("result:" + result.getLong("cid"));
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (null != conn) {
                conn.close();
            }
        }
    }

    private void execute(final DataSource dataSource, final String sql) throws SQLException {
        try (
                Connection conn = dataSource.getConnection();
                Statement statement = conn.createStatement()) {
            statement.execute(sql);
        }
    }

    private long executeAndGetGeneratedKey(final DataSource dataSource, final String sql) throws SQLException {
        long result = -1;
        try (
                Connection conn = dataSource.getConnection();
                Statement statement = conn.createStatement()) {
            statement.executeUpdate(sql, Statement.RETURN_GENERATED_KEYS);
            ResultSet resultSet = statement.getGeneratedKeys();
            if (resultSet.next()) {
                result = resultSet.getLong(1);
            }
        }
        return result;
    }

    /**
     * -----------------------------表初始化--------------------------------
     */
    public void droptable(DataSource dataSource) throws SQLException {
        execute(dataSource, "DROP TABLE IF EXISTS course_1");
        execute(dataSource, "DROP TABLE IF EXISTS course_2");
    }

    public void createtable(DataSource dataSource) throws SQLException {
        execute(dataSource, "CREATE TABLE course_1 (cid BIGINT(20) PRIMARY KEY,cname VARCHAR(50) NOT NULL,user_id BIGINT(20) NOT NULL,cstatus varchar(10) NOT NULL);");
        execute(dataSource, "CREATE TABLE course_2 (cid BIGINT(20) PRIMARY KEY,cname VARCHAR(50) NOT NULL,user_id BIGINT(20) NOT NULL,cstatus varchar(10) NOT NULL);");
    }
}

pom依赖:

    
        
        
            org.apache.shardingsphere
            shardingsphere-jdbc-core-spring-boot-starter
            5.2.1
            
                
                    snakeyaml
                    org.yaml
                
            
        
        
        
            org.yaml
            snakeyaml
            1.33
        
        
        
            org.apache.shardingsphere
            shardingsphere-transaction-xa-core
            5.2.1
            
                
                    transactions-jdbc
                    com.atomikos
                
                
                    transactions-jta
                    com.atomikos
                
            
        


        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
        

        
            com.alibaba
            druid-spring-boot-starter
            1.1.20
        

        
            mysql
            mysql-connector-java
        







        
            com.baomidou
            mybatis-plus-boot-starter
            3.0.5
        

        
            org.projectlombok
            lombok
        





    

application.properties配置文件:

# sharding-jdbc 水平分表策略
# 配置数据源,给数据源起别名
spring.shardingsphere.datasource.names=m1

# 一个实体类对应两张表,覆盖
spring.main.allow-bean-definition-overriding=true

# 配置数据源的具体内容,包含连接池,驱动,地址,用户名,密码
spring.shardingsphere.datasource.m1.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.m1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.m1.url=jdbc:mysql://xxxx:3306/test?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true
spring.shardingsphere.datasource.m1.username=root
spring.shardingsphere.datasource.m1.password=xxxx

# 指定course表分布的情况,配置表在哪个数据库里,表的名称都是什么 m1.course_1,m1.course_2
spring.shardingsphere.sharding.tables.course.actual-data-nodes=m1.course_$->{1..2}

# 指定 course 表里面主键 cid 的生成策略 SNOWFLAKE
spring.shardingsphere.sharding.tables.course.key-generator.column=cid
spring.shardingsphere.sharding.tables.course.key-generator.type=SNOWFLAKE

# 配置分表策略    约定 cid 值偶数添加到 course_1 表,如果 cid 是奇数添加到 course_2 表
spring.shardingsphere.sharding.tables.course.table-strategy.inline.sharding-column=cid
spring.shardingsphere.sharding.tables.course.table-strategy.inline.algorithm-expression=course_$->{cid % 2 + 1}

# 打开 sql 输出日志
spring.shardingsphere.props.sql.show=true

spring.shardingsphere.mode.type=Standalone
spring.shardingsphere.mode.repository.type=File
spring.shardingsphere.mode.overwrite=true
orithms.course_tbl_alg.props.algorithm-expression=course_$->{cid%2+1}

1.主键生成策略

在application.properties配置文件中配置主键生成规则:

spring.shardingsphere.rules.sharding.key-generators.course_cid_alg.type=MYKEY

以下是主键生成策略的核心代码,调用newInstance(final AlgorithmConfiguration keyGenerateAlgorithmConfig) 会生成对应的主键生成策略。

/**
 * Key generate algorithm factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class KeyGenerateAlgorithmFactory {
    
    static {
        ShardingSphereServiceLoader.register(KeyGenerateAlgorithm.class);
    }
    
    /**
     * Create new instance of key generate algorithm.
     *
     * @return created instance
     */
    public static KeyGenerateAlgorithm newInstance() {
        return RequiredSPIRegistry.getRegisteredService(KeyGenerateAlgorithm.class);
    }
    
    /**
     * Create new instance of key generate algorithm.
     *
     * @param keyGenerateAlgorithmConfig key generate algorithm configuration
     * @return created instance
     */
    public static KeyGenerateAlgorithm newInstance(final AlgorithmConfiguration keyGenerateAlgorithmConfig) {
        return ShardingSphereAlgorithmFactory.createAlgorithm(keyGenerateAlgorithmConfig, KeyGenerateAlgorithm.class);
    }
    
    /**
     * Judge whether contains key generate algorithm.
     *
     * @param keyGenerateAlgorithmType key generate algorithm type
     * @return contains key generate algorithm or not
     */
    public static boolean contains(final String keyGenerateAlgorithmType) {
        return TypedSPIRegistry.findRegisteredService(KeyGenerateAlgorithm.class, keyGenerateAlgorithmType).isPresent();
    }
}

2.分片策略

在application.properties配置文件中配置分片策略:

spring.shardingsphere.rules.sharding.sharding-algorithms.course_tbl_alg.type=MYCOMPLEX

以下是生成分片策略的工厂类,核心是调用newInstance()方法创建分片策略。

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.sharding.spi.ShardingAlgorithm;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;

/**
 * Sharding algorithm factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingAlgorithmFactory {
    
    static {
        ShardingSphereServiceLoader.register(ShardingAlgorithm.class);
    }
    
    /**
     * Create new instance of sharding algorithm.
     * 
     * @param shardingAlgorithmConfig sharding algorithm configuration
     * @return created instance
     */
    public static ShardingAlgorithm newInstance(final AlgorithmConfiguration shardingAlgorithmConfig) {
        return ShardingSphereAlgorithmFactory.createAlgorithm(shardingAlgorithmConfig, ShardingAlgorithm.class);
    }
    
    /**
     * Judge whether contains sharding algorithm.
     *
     * @param shardingAlgorithmType sharding algorithm type
     * @return contains sharding algorithm or not
     */
    public static boolean contains(final String shardingAlgorithmType) {
        return TypedSPIRegistry.findRegisteredService(ShardingAlgorithm.class, shardingAlgorithmType).isPresent();
    }
}

3.SPI机制

以上的分布式主键生成和分片策略生成都可以自定义实现,该自定义实现是通过SPI机制实现的。

SPI机制指系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。

Java SPI 的具体约定为:当服务的提供者,提供了服务接口的一种实现之后,在jar包的 META-INF/services/ 目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。

而当外部程序装配这个模块的时候,就能通过该jar包**META-INF/services/**里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。

基于这样一个约定就能很好的找到服务接口的实现类,而不需要再代码里制定。jdk提供服务实现查找的一个工具类:java.util.ServiceLoader。在主键生成策略和分片策略中的源码中都能看到ServiceLoader.load(serviceInterface)这样的方法,这就是通过SPI来加载自定义策略。

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;

/**
* ShardingSphere service loader.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingSphereServiceLoader {

private static final Map

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论