MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

2023年 11月 22日 126.5k 0

原作者:李宏达

架构图

一、 配置安装MogDB和wal2json 插件

  • wal2json安装配置参考链接 Debezium Adapt openGauss (五、 openGauss install wal2json)
  • wal2json 下载地址
  • 数据库安装

二、 启动KAFKA

cd ./kafka/bin
[root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

三、 编辑配置文件

cat og.json

{ "name": "mtk-connector", // The name of the connector when registered with a Kafka Connect service.
"config": {
"schema.include.list": "lee", // sycn schema
"plugin.name": "wal2json", // plugin
"database.password": "Enmo@123",
"database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address
"database.hostname": "192.168.1.11", // The address of the PostgreSQL server.
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class.
"database.user": "lee", // The name of the PostgreSQL user that has the required privileges.
"slot.name": "wal2json1", // slot unique
"snapshot.mode": "never",
"database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
"database.port": "18000", // The port number of the PostgreSQL server.
"tasks.max": "1",
"table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
"database.dbname": "postgres" // The name of the PostgreSQL database to connect to
}
}

四、 启动 DEBEZIUM 并且注册

1. 修改配置文件插件路径

cat /root/kafka/config/connect-distributed.properties
plugin.path=/root/kafka/connect

2. 启动DBZ程序

[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

3. 注册DBZ信息

curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;' -d @og.json.kafka

4. 消费测试

  • 查看topic_name 源端要先插入一条数据

./kafka-topics.sh --bootstrap-server localhost:9092 --list

  • 消费测试

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og

5. 其他

  • 删除DBZ信息

curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"

  • 删除slot

删除slot
postgres=# select slot_name from pg_get_replication_slots();
slot_name
---------------------------
standby_192.168.1.12_26001
primary_192.168.1.12_26001
wal2json1
(3 rows)

postgres=# select * from pg_drop_replication_slot('wal2json1');

  • 查看offset

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe

  • 清除offset

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute

  • 删除topic

./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092 --topic OPENGAUSS.lee.test_og

五、配置Datawork同步

1. 配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME

2. 编辑 dataworker.properties

mtk.topicPattern=og // 无需修改
mtk.groupId=worker-oracle //不同进程不一样
mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址
mtk.maxPollRecordsConfig=10000 最大拉取数量
mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小
mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123
mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123
mtk.includeSchema=lee
mtk.includeTable=lee.test

  • target 支持PostgreSQL MySQL Oracle openGauss MogDB

3. 启动dataworker

java -jar mtk-data-worker.jar

六、 同步程序移植到其他Kafka(可选)

1. 拷贝整个connect文件夹和配置文件

  • connect 文件夹
  • config/connect-distributed.properties

task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/root/datawork/kafka/connect

七、消费到其他kafka(可选)

  • 修改DBZ配置文件

[root@v-21-47-30-225 config]# grep -i bootstrap.servers /root/kafka/config/connect-distributed.properties
# the `bootstrap.servers` and those specifying replication factors.
bootstrap.servers=192.168.1.1:9092

  • 注册信息
    cat og.json.kafka

{ "name": "og",
"config": {
"schema.include.list": "lee",
"plugin.name": "wal2json",
"database.password": "Enmo@123",
"database.history.kafka.bootstrap.servers": "192.168.1.1:9092",
"database.hostname": "192.168.1.11",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "lee",
"slot.name": "wal2json1",
"snapshot.mode": "never",
"database.server.name": "OPENGAUSS",
"database.port": "26000",
"tasks.max": "1",
"table.include.list": "lee.test_OG",
"database.dbname": "test"
}
}

八、 部分日志截图

  • 同步日志

架构图

一、 配置安装MogDB和wal2json 插件

  • wal2json安装配置参考链接 Debezium Adapt openGauss (五、 openGauss install wal2json)
  • wal2json 下载地址
  • 数据库安装

二、 启动KAFKA

cd ./kafka/bin
[root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

三、 编辑配置文件

cat og.json

{ "name": "mtk-connector", // The name of the connector when registered with a Kafka Connect service.
"config": {
"schema.include.list": "lee", // sycn schema
"plugin.name": "wal2json", // plugin
"database.password": "Enmo@123",
"database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address
"database.hostname": "192.168.1.11", // The address of the PostgreSQL server.
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class.
"database.user": "lee", // The name of the PostgreSQL user that has the required privileges.
"slot.name": "wal2json1", // slot unique
"snapshot.mode": "never",
"database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
"database.port": "18000", // The port number of the PostgreSQL server.
"tasks.max": "1",
"table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
"database.dbname": "postgres" // The name of the PostgreSQL database to connect to
}
}

四、 启动 DEBEZIUM 并且注册

1. 修改配置文件插件路径

cat /root/kafka/config/connect-distributed.properties
plugin.path=/root/kafka/connect

2. 启动DBZ程序

[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

3. 注册DBZ信息

curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;charset=UTF-8' -d @og.json.kafka

4. 消费测试

  • 查看topic_name 源端要先插入一条数据

./kafka-topics.sh --bootstrap-server localhost:9092 --list

  • 消费测试

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og

5. 其他

  • 删除DBZ信息

curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"

  • 删除slot

删除slot
postgres=# select slot_name from pg_get_replication_slots();
slot_name
---------------------------
standby_192.168.1.12_26001
primary_192.168.1.12_26001
wal2json1
(3 rows)

postgres=# select * from pg_drop_replication_slot('wal2json1');

  • 查看offset

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe

  • 清除offset

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute

  • 删除topic

./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092 --topic OPENGAUSS.lee.test_og

五、配置Datawork同步

1. 配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME

2. 编辑 dataworker.properties

mtk.topicPattern=og // 无需修改
mtk.groupId=worker-oracle //不同进程不一样
mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址
mtk.maxPollRecordsConfig=10000 最大拉取数量
mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小
mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123
mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123
mtk.includeSchema=lee
mtk.includeTable=lee.test

  • target 支持PostgreSQL MySQL Oracle openGauss MogDB

3. 启动dataworker

java -jar mtk-data-worker.jar

六、 同步程序移植到其他Kafka(可选)

1. 拷贝整个connect文件夹和配置文件

  • connect 文件夹
  • config/connect-distributed.properties

task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/root/datawork/kafka/connect

七、消费到其他kafka(可选)

  • 修改DBZ配置文件

[root@v-21-47-30-225 config]# grep -i bootstrap.servers /root/kafka/config/connect-distributed.properties
# the `bootstrap.servers` and those specifying replication factors.
bootstrap.servers=192.168.1.1:9092

  • 注册信息
    cat og.json.kafka

{ "name": "og",
"config": {
"schema.include.list": "lee",
"plugin.name": "wal2json",
"database.password": "Enmo@123",
"database.history.kafka.bootstrap.servers": "192.168.1.1:9092",
"database.hostname": "192.168.1.11",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "lee",
"slot.name": "wal2json1",
"snapshot.mode": "never",
"database.server.name": "OPENGAUSS",
"database.port": "26000",
"tasks.max": "1",
"table.include.list": "lee.test_OG",
"database.dbname": "test"
}
}

八、 部分日志截图

  • 同步日志

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论