使用 TiKV 读改写 TiDB 数据

2024年 5月 7日 84.9k 0

一切开始的原因

由于数据开发的需要,我一度尝试将tidb 的使用范围更大话,同时目前大数据开发中,内存当做堆料,对于公司的开支也会与很大压力,那么就我就尝试将tikv 当做kafka 和redis 使用,本文章中将讲述开发的过程以及衍生品;

row_id 是什么

也许你和我在尝试使用tikv的时候会感觉网上的资料好像都是不太对劲的样子,比如:

https://tikv.github.io/client-java/examples/txnkv.html

我们看看这块代码:

import java.util.Arrays;
import java.util.List;

import org.tikv.common.BytePairWrapper;
import org.tikv.common.ByteWrapper;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb.KvPair;
import org.tikv.shade.com.google.protobuf.ByteString;
import org.tikv.txn.KVClient;
import org.tikv.txn.TwoPhaseCommitter;

public class App {
    public static void main(String[] args) throws Exception {
        TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2379");
        try (TiSession session = TiSession.create(conf)) {
            // two-phrase write
            long startTS = session.getTimestamp().getVersion();
            try (TwoPhaseCommitter twoPhaseCommitter = new TwoPhaseCommitter(session, startTS)) {
                BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000);
                byte[] primaryKey = "key1".getBytes("UTF-8");
                byte[] key2 = "key2".getBytes("UTF-8");

                // first phrase: prewrite
                twoPhaseCommitter.prewritePrimaryKey(backOffer, primaryKey, "val1".getBytes("UTF-8"));
                List pairs = Arrays
                        .asList(new BytePairWrapper(key2, "val2".getBytes("UTF-8")));
                twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 1000);

                // second phrase: commit
                long commitTS = session.getTimestamp().getVersion();
                twoPhaseCommitter.commitPrimaryKey(backOffer, primaryKey, commitTS);
                List keys = Arrays.asList(new ByteWrapper(key2));
                twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 1000);
            }

            try (KVClient kvClient = session.createKVClient()) {
                long version = session.getTimestamp().getVersion();
                ByteString key1 = ByteString.copyFromUtf8("key1");
                ByteString key2 = ByteString.copyFromUtf8("key2");

                // get value of a single key
                ByteString val = kvClient.get(key1, version);
                System.out.println(val);

                // get value of multiple keys
                BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000);
                List kvPairs = kvClient.batchGet(backOffer, Arrays.asList(key1, key2), version);
                System.out.println(kvPairs);

                // get value of a range of keys
                kvPairs = kvClient.scan(key1, ByteString.copyFromUtf8("key3"), version);
                System.out.println(kvPairs);
            }
        }
    }
}

tikv 存储的时候key 是什么东西呢?在使用scan api 后读取的数据全部都是encode 的数据,参考网上flink-tidb-cdc 源码,只有value 的decode , key呢?

首先我们打印一下key 长得样子

key = 

看结果好像应该是有个是table_id,一个是row_id;

我们继续找资料:

官方文档中 region 切分 有提及:

Split Table Region

表中行数据的 key 由 table_id 和 row_id 编码组成,格式如下:

t[table_id]_r[row_id]

例如,当 table_id 是 22,row_id 是 11 时:

t22_r11

同一表中行数据的 table_id 是一样的,但 row_id 肯定不一样,所以可以根据 row_id 来切分 Region。

 TIDB_DECODE_KEY

TIDB_DECODE_KEY 函数用于将 TiDB 编码的键输入解码为包含 _tidb_rowid 和 table_id 的 JSON 结构。你可以在一些系统表和日志输出中找到 TiDB 的编码键。

语法图

语法图代码

  • TableStmtTIDB_DECODE_KEY(STR)示例

以下示例中,表 t1 有一个隐藏的 rowid,该 rowid 由 TiDB 生成。语句中使用了 TIDB_DECODE_KEY 函数。结果显示,隐藏的 rowid 被解码后并输出,这是典型的非聚簇主键结果。

SELECT START_KEY, TIDB_DECODE_KEY(START_KEY) FROM information_schema.tikv_region_status WHERE table_name='t1' AND REGION_ID=2\G
*************************** 1. row ***************************
                 START_KEY: 7480000000000000FF3B5F728000000000FF1DE3F10000000000FA
TIDB_DECODE_KEY(START_KEY): {"_tidb_rowid":1958897,"table_id":"59"}
1 row in set (0.00 sec)

 

但是至此我们还是无法明白row_id 到底是怎么生成的,受什么控制:

终于我在一篇tidb 公众号文章(参考资料里)里面找到答案

文中这样写道:


至此我们已经聊完了如何将 Table 映射到 KV 上面,这里再举个简单的例子,便于大家理解,还是以上面的表结构为例。假设表中有 3 行数据:

1, "TiDB", "SQL Layer", 10

2, "TiKV", "KV Engine", 20

3, "PD", "Manager", 30

那么首先每行数据都会映射为一个 Key-Value pair,注意这个表有一个 Int 类型的 Primary Key,所以 RowID 的值即为这个 Primary Key 的值。假设这个表的 Table ID 为 10,其 Row 的数据为:

t_r_10_1  --> ["TiDB", "SQL Layer", 10]

t_r_10_2 --> ["TiKV", "KV Engine", 20]

t_r_10_3 --> ["PD", "Manager", 30]

 

至此,我们可以得到一个结论,key包含两个信息,一个是table_id,另外一个是row_id,且当主键为INT 类型时,主键就是row_id;

able_id 可以通过如下获得;

TiTableInfo table = session.getCatalog().getTable("ods", "ods_consult_order_event");
                long id = table.getId();

 写一个生成 key 的方法

我们再扒一扒tikv Client 源码:

 

使用 TiKV 读改写 TiDB 数据-1

    protected static final byte[] TBL_PREFIX = new byte[]{116};// 这个其实就是t 
    private static final byte[] REC_PREFIX_SEP = new byte[]{95, 114}; // 这个其实就是_r

private static byte[] encode(long tableId, long handle) {
        CodecDataOutput cdo = new CodecDataOutput();
        encodePrefix(cdo, tableId);
        IntegerCodec.writeLong(cdo, handle);
        return cdo.toBytes();
    }



    private static void encodePrefix(CodecDataOutput cdo, long tableId) {
        cdo.write(TBL_PREFIX);
        IntegerCodec.writeLong(cdo, tableId);
        cdo.write(REC_PREFIX_SEP);
    }

        public static void writeLong(CodecDataOutput cdo, long lVal) {
            cdo.writeLong(flipSignBit(lVal));
        }

        private static long flipSignBit(long v) {
            return v ^ -9223372036854775808L;
        }



    /**
     * Writes a long to the underlying output stream as eight
     * bytes, high byte first. In no exception is thrown, the counter
     * written is incremented by 8.
     *
     * @param      v   a long to be written.
     * @exception  IOException  if an I/O error occurs.
     * @see        java.io.FilterOutputStream#out
     */
    public final void writeLong(long v) throws IOException {
        writeBuffer[0] = (byte)(v >>> 56);
        writeBuffer[1] = (byte)(v >>> 48);
        writeBuffer[2] = (byte)(v >>> 40);
        writeBuffer[3] = (byte)(v >>> 32);
        writeBuffer[4] = (byte)(v >>> 24);
        writeBuffer[5] = (byte)(v >>> 16);
        writeBuffer[6] = (byte)(v >>>  8);
        writeBuffer[7] = (byte)(v >>>  0);
        out.write(writeBuffer, 0, 8);
        incCount(8);
    }

 

通过以上我们可以写一个通过table_id,row_id 获取key 的方法:

package util;

/**
 * @Maintainer 蜡笔老舅
 * @Email 
 * @CreateDate 2023/3/17
 * @Version 1.0
 * @Comment
 */
import java.io.Serializable;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.codec.Codec.IntegerCodec;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.exception.TiExpressionException;
import org.tikv.common.key.Key;
import org.tikv.common.key.TypedKey;

import static org.tikv.common.key.RowKey.toRowKey;

public class TidbRowKeyUtil extends Key implements Serializable {
    private static final byte[] REC_PREFIX_SEP = new byte[]{95, 114};
    private final long tableId;
    private final long handle;
    private final boolean maxHandleFlag;

    private TidbRowKeyUtil(long tableId, long handle) {
        super(encode(tableId, handle));
        this.tableId = tableId;
        this.handle = handle;
        this.maxHandleFlag = false;
    }

    public static byte[] encode(long tableId, long handle) {
        CodecDataOutput cdo = new CodecDataOutput();
        encodePrefix(cdo, tableId);
        encodeRow(cdo,handle);
        return cdo.toBytes();
    }



    private static void encodePrefix(CodecDataOutput cdo, long tableId) {
        // 增加 /t table_id 标识
        cdo.write(TBL_PREFIX);
        cdo.writeLong(tableId ^ Long.MIN_VALUE);
        // 写入一个 /r row_id 标识
        cdo.write(REC_PREFIX_SEP);
    }

    private static void encodeRow(CodecDataOutput cdo, long tableId) {
        // 对row_id 进行编码
        cdo.writeLong(tableId ^ Long.MIN_VALUE);
    }


}

 

然后我们验证:

package tidb;

import com.alibaba.fastjson.JSONObject;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.key.RowKey;
import org.tikv.common.meta.TiColumnInfo;
import org.tikv.common.meta.TiTableInfo;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.shade.com.google.protobuf.ByteString;
import org.tikv.txn.KVClient;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

import static org.tikv.common.codec.TableCodec.decodeObjects;

/**
 * @Maintainer 蜡笔老舅
 * @Email 
 * @CreateDate 2023/3/17
 * @Version 1.0
 * @Comment
 */
public class KVClientMatchTest implements Runnable{
    public static void main(String[] args) {
        //3.创建实现类的对象
        KVClientMatchTest mThread = new KVClientMatchTest();

        //4.将此对象作为参数传递到Thread类的构造器中,创建Thread类的对象
        Thread t1 = new Thread(mThread);
        t1.setName("线程1");
        t1.start();
//
//        Thread t2 = new Thread(mThread);
//        t2.setName("线程2");
//        t2.start();
//
//        Thread t3 = new Thread(mThread);
//        t3.setName("线程3");
//        t3.start();
//
//        Thread t4 = new Thread(mThread);
//        t4.setName("线程4");
//        t4.start();
//
//        Thread t5 = new Thread(mThread);
//        t5.setName("线程5");
//        t5.start();
    }

    private static void getKey(TiSession session, KVClient kvClient, Integer i) {
        long version = session.getTimestamp().getVersion();
        System.out.println("version = " + version);
        System.out.println("System.currentTimeMillis() = " + System.currentTimeMillis());
        TiTableInfo table = session.getCatalog().getTable("库名", "表名");

        byte[] encode = TidbRowKeyUtil.encode(table.getId(), i);
        ByteString key = ByteString.copyFrom(encode);
        ByteString bytes = kvClient.get(key, version);

        System.out.println("bytes.size() = " + bytes.size());

        if (bytes.size()!=0) {
            Kvrpcpb.KvPair.Builder builder = Kvrpcpb.KvPair.newBuilder();
            builder.setKey(key);
            builder.setValue(bytes);
            Kvrpcpb.KvPair build = builder.build();
            JSONObject dataJSON = getDataJSON(build, table.getColumns(), table);
            System.out.println("i = " + i);
            System.out.println("dataJSON = " + dataJSON);
        }
    }
    private static JSONObject getDataJSON(Kvrpcpb.KvPair record, List columns, TiTableInfo tiTableInfo) {
        JSONObject jsonObject = new JSONObject();
        Object[] tikvValues =
                decodeObjects(
                        record.getValue().toByteArray(),
                        RowKey.decode(record.getKey().toByteArray()).getHandle(),
                        tiTableInfo);
        List objects = Arrays.asList(tikvValues);
        for (int i = 0; i < tikvValues.length; i++) {
            String type = columns.get(i).getType().getName();
            jsonObject.put(columns.get(i).getName(),getDataObject(objects.get(i),type));
        }
        return jsonObject;
    }
    protected static Object getDataObject(Object o, String type) {

        if (o!=null) {
            switch(type.toLowerCase()) {
                case "timestamp": case "datetime@asia/shanghai":
                    return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(o);
                case "time":
                    return new SimpleDateFormat("HH:mm:ss").format(new Date(Long.parseLong(o.toString())));
                default:
                    return o;
            }
        }
        return o;
    }

    @Override
    public void run() {
        Long startTs=System.currentTimeMillis();
        TiConfiguration conf = TiConfiguration.createDefault(Constant.TIDB_PD_ADDR_TEST);
        TiSession session = TiSession.create(conf);
        KVClient kvClient = session.createKVClient();
        for (int j = 1; j <= 10000; j++) {
            getKey(session,kvClient,j);
        }

        kvClient.close();
        try {
            session.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        Long endTs=System.currentTimeMillis();
        long l = endTs - startTs;
        System.out.println("l = " + l);
    }
}

验证后果然不出所料,的确符合我们预期;

关于 Tisession

另外:

TIsission 的源码:public class TiSession implements AutoCloseable {

引用了 AutoCloseable 接口,会自动的释放掉资源实际上Session 对象中内置了线程池的概念,在使用中不需要再自己的生成线程池;

依此开发的

基于此,我们可以开发一个tikv 读取数据的工具:

like:

gitee地址:https://gitee.com/mf499441/tikv-reader

打包教程:https://gitee.com/mf499441/tikv-reader/wikis/tikvReader%20%E6%89%93%E5%8C%85exe%20%E6%96%87%E4%BB%B6

页面效果:

使用 TiKV 读改写 TiDB 数据-2

其他有关tikv + flink 实时数据开发的后面再补充。

其他注意点

row_id 如果非数字主键,row_id 依次生成,1,2,3 与你所写的内容无关。

如果主键是数字,还需要注意另外一个问题,

数字主键索引类型只可以为CLUSTERED,时,row_id 与主键相同,当为NONCLUSTERED时,row_id 依次生成,与非数字主键一样与你的的内容无关

引用资料

https://github.com/marsishandsome/tikv-client-examples

https://tikv.github.io/client-java/examples/txnkv.html

https://docs.pingcap.com/zh/tidb/stable/tidb-storage

https://docs.pingcap.com/zh/tidb/stable/tidb-architecture

https://mp.weixin.qq.com/s/1C3Rtxu9UPGjeziiV33NvA

https://asktug.com/t/topic/273494/2

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论