Streaming Data Changes from MySQL to Elasticsearch

2023年 7月 19日 50.6k 0

“我正在参加「掘金·启航计划」”

MySQL Binary Log包含了针对数据库执行DDL(Data Definition Language)和DML(Data Manipulation Language)操作的完整事件,其被广泛应用于数据复制和数据恢复场景。本文所分享的就是一种基于MySQL Binary Log特性实现增量数据近实时同步到Elasticsearch的一种技术。要想实现增量数据的同步,仅仅有binary log是不够的,我们还需要一款变更数据捕获(CDC,Change Data Capture)工具,可能大家很快就会想到阿里巴巴开源的Canal。没错,但本文今天给大家分享一款新的开源工具:Debezium。Debezium构建于Kafka之上,它为MySQLMongoDBPostgreSQLOrcaleCassandra等一众数据库量身打造了一套完全适配于Kafka Connectsource connector。首先,source connector会实时获取由INSERTUPDATEDELETE操作所触发的数据变更事件;然后,将其发送到Kafka topic中;最后,我们使用sink connector将topic中的数据变更事件同步到Elasticsearch中去,从而最终实现数据的近实时流转,如下图所示。

Debezium支持StandalonePrimary and replicaHigh available clustersMulti-primary等多种拓扑结构。

1 安装MySQL

1.1 解压与配置

tar -xzvf mysql-8.0.21-el7-x86_64.tar.gz -C /root/debezium/

在mysql-8.0.21-el7-x86_64根目录下,新增my.cnf文本文件,然后将以下内容复制到my.cnf文件内。

[client]
port=3306
socket=/root/debezium/mysql-8.0.21-el7-x86_64/mysql.sock

[mysqld]
port=3306
socket=/root/debezium/mysql-8.0.21-el7-x86_64/mysql.sock
key_buffer_size=16M
max_allowed_packet=128M
basedir=/root/debezium/mysql-8.0.21-el7-x86_64
datadir=/root/debezium/mysql-8.0.21-el7-x86_64/data
server-id=101
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
binlog_expire_logs_seconds=86400

[mysqldump]
quick

1.2 初始化

./bin/mysqld --defaults-file=/root/debezium/mysql-8.0.21-el7-x86_64/my.cnf --initialize

执行上述初始化操作后,仔细观察控制台:你会发现root账号已经生成了一个临时密码。

2020-12-28T02:55:20.774965Z6 [Note] [MY-010454] [Server] Atemporarypasswordisgeneratedforroot@localhost:&,Yot7iMeT_T

1.3 启动MySQL Server

初始化操作并没有启动MySQL Server,所以你还需要手动启动MySQL Server。

./bin/mysqld --defaults-file=/root/debezium/mysql-8.0.21-el7-x86_64/my.cnf --user=root

1.4 重置'root'账号密码

USE mysql;
ALTER USER 'root'@'localhost' IDENTIFIED BY 'root账号新密码';
FLUSH PRIVILEGES;

1.5 更新远程访问权限

USE mysql;
UPDATE USER SET host = '%' WHERE user = 'root';
FLUSH PRIVILEGES;

2 安装Kafka

配置不多赘述,自行上网解决。目前Kafka依赖Zookeeper组件,故其内置了Zookeeper,我们可以直接使用,无需单独下载。

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties>/dev/null 2>&1 &
nohup ./bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1 &

3 安装Debezium

3.1 解压source connector与sink connector

tar -xzvf debezium-connector-mysql-1.4.2.Final-plugin.tar.gz -C /root/debezium/connector-plugins
unzip confluentinc-kafka-connect-elasticsearch-11.0.3 -d /root/debezium/connector-plugins

3.2 Kafka Connect

为了更方便、更规范地整合Kafka与其他数据系统,Kafka提供了Kafka Connect,Kafka Connect定义了source connectorsink connector接口规范。如果想从其他数据系统传输数据到Kafka,那么就需要实现source connector接口规范;如果想从Kafka传输数据到其他数据系统,那么就需要实现sink connector接口规范。此外,Kafka Connect还暴露了一套REST API,可以更方便地对connector进行管理。

3.2.1 配置

#################### connect-distributed.properties ###################
bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets.
# This topic should have many partitions and be replicated and compacted.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

# Topic to use for storing connector and task configurations.
# This topic should be a single partition, highly replicated.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. 
# This topic can have multiple partitions and should be replicated and compacted.
status.storage.topic=connect-status
status.storage.replication.factor=1
 
# Hostname & Port for the REST API to listen on. 
rest.host.name=localhost
rest.port=8083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# The list should consist of top level directories. 
# source connector和sink connector的依赖路径
plugin.path=/root/debezium/connector-plugins/

3.2.2 创建topic

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-offsets --partitions 3 --replication-factor 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-configs --partitions 1 --replication-factor 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-status --partitions 3 --replication-factor 1 --config cleanup.policy=compact

3.2.3 启动

nohup ./bin/connect-distributed.sh config/connect-distributed.properties>/dev/null 2>&1 &

3.3 注册debezium source connector

参数 描述 默认值
include.schema.changes 若值为true,那么source connector会将schema变更事件发布到kakfa中;topic的命名和database.server.name一致 true
tombstones.on.delete 若值为true,那么source connector针对delete操作会额外生成一个墓碑事件 true
database.server.id 和mysql中server_id值一致
database.include.list 指定数据库名称,多个数据库以逗号分割
database.history.kafka.topic 指定保存mysql schema history的topic名称,该topic仅能由debezium自己消费
{
    "name": "debezium-mysql-source-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "include.schema.changes": true,
        "tombstones.on.delete": true,
        "database.hostname": "10.254.9.82",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "Nz_3@sMw7P",
        "database.server.id": "101",
        "database.server.name": "master",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "master.schema.history"
    }
}
curl -X POST -H 'Content-Type: application/json' -d '@/root/debezium/connector-configs/debezium-mysql-source-connector.json' http://localhost:8083/connectors

当source connector注册成功后,再次检查topic,你会发现两个全新的topic,它们分别是:master和master.schema.history,二者差异如下表所示。

topic名称 保存内容 topic消费方
master schema变更事件,但仅仅涉及database.include.list所指定的数据库 第三方消费者
master.schema.history schema变更事件,涉及所有数据库 debezium

3.4 注册confluent sink connector

参数 描述 默认值
key.ignore 若值为false,那么Elasticsearch文档ID将和MySQL保持一致 false
schema.ignore 若值为false,那么Elasticsearch将禁用动态映射特性,转而根据schema来定义文档中字段的数据类型 false
write.method 若值为UPSERT,那么Elasticsearch会根据文档是否存在来进行INSERT亦或UPDATE操作 INSERT
behavior.on.null.values 若值为DELETE,那么sink connector将会根据文档ID删除该文档 FAIL
transforms.unwrap.type ElasticsearchSinkConnector主要用于数据扁平化处理,因为Debezium所生成的数据变更事件是一种多层级的数据结构,这不利于在Elasticsearch中保存,所以需要对这种结构进行扁平化处理
transforms.unwrap.drop.tombstone 若值为false,墓碑事件不会被丢弃 true
transforms.unwrap.delete.handling.mode Debezium会为每个DELETE操作生成删除事件和墓碑事件;若值为none,那么墓碑事件将会保留 drop
transforms.key.type ExtractField$Key可以从Debezium数据变更事件的Key中抽取特定字段值
transforms.key.field 指定抽取字段
{
    "name": "confluent-elasticsearch-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "master.inventory.customers",
        "batch.size": "500",
        "key.ignore": false,
        "schema.ignore": false,
        "write.method": "UPSERT",
        "connection.url": "http://10.254.8.14:9200",
        "connection.username": "elastic",
        "connection.password": "Qwe123!@cmss",
        "connection.timeout.ms": "3000",
        "read.timeout.ms": "5000",
        "behavior.on.null.values": "DELETE",
        "transforms": "unwrap, key",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": false,
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "id"
    }
}
curl -X POST -H 'Content-Type: application/json' -d '@/root/debezium/connector-configs/confluent-elasticsearch-sink-connector.json' http://localhost:8083/connectors

当你完成source connector和sink connector的注册后,你可以通过通过Kafka Connect提供的REST API来查看当前已注册的连接器,具体如下:

curl --location --request GET 'http://10.254.8.14:8083/connectors'
-----------------------------------
[    "confluent-elasticsearch-sink-connector",    "debezium-mysql-source-connector"]

kafka transformation

  • Source connectors pass records through the transformation before writing to the Kafka topic.
  • Sink connectors pass records through the transformation before writing to the sink.

3.5 验证

3.5.1 插入数据

INSERT INTO `inventory`.`customers`
            (`id`,
             `first_name`,
             `last_name`,
             `email`,
             `create_time`,
             `update_time`)
VALUES      ( 1001,
              'optimus',
              'prime',
              'optimus@prime.com',
              '2021-03-03 13:55:07.000000',
              '2021-03-21 13:55:11.000000' ); 
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 750,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1001",
                "_score": 1.0,
                "_source": {
                    "id": 1001,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus@prime.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000
                }
            }
        ]
    }
}
-----------------------------------

3.5.2 删除数据

DELETE FROM `inventory`.`customers` WHERE  `id` = 1002 
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 435,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    }
}
-----------------------------------

3.5.3 更新数据

UPDATE `inventory`.`customers`
SET    `email` = 'optimus_prime@transformers.com'
WHERE  `id` = 1001 
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1001",
                "_score": 1.0,
                "_source": {
                    "id": 1001,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus_prime@transformers.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000
                }
            }
        ]
    }
}
-----------------------------------

3.5.4 更新主键

UPDATE `inventory`.`customers`
SET    `id` = 1002
WHERE  `id` = 1001 
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1002",
                "_score": 1.0,
                "_source": {
                    "id": 1002,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus_prime@transformers.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000
                }
            }
        ]
    }
}
-----------------------------------

3.5.5 新增字段

ALTER TABLE `inventory`.`customers` ADD COLUMN `address` VARCHAR ( 255 ) NULL;

INSERT INTO `inventory`.`customers`
            (`id`,
             `first_name`,
             `last_name`,
             `email`,
             `create_time`,
             `update_time`,
             `address`
             )
VALUES      ( 1001,
              'optimus',
              'prime',
              'optimus@prime.com',
              '2021-03-03 13:55:07.000000',
              '2021-03-21 13:55:11.000000',
              '77 Massachusetts Avenue'); 
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1001",
                "_score": 1.0,
                "_source": {
                    "id": 1001,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus@prime.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000,
                    "address": "77 Massachusetts Avenue"
                }
            }
        ]
    }
}
-----------------------------------

4 总结

本文为大家分享了一种基于Debezium实现增量数据实时流转的方案。当你通过INSERT指令向MySQL新增一行记录时,那么Elasticsearch中也会实时新增一行记录;当你通过UPDATE指令向MySQL更新一行记录时,那么Elasticsearch中也会实时对该行记录进行更新;当你通过DELETE指令向MySQL删除一条记录时,那么Elasticsearch中也会实时删除该行记录。同时,Debezium在应对主键更新亦或字段新增两种场景时,依然有较好的表现。当然,如果你想将存量数据复制到Elasticsearch中,那么建议采用Logstash配合Kafka来实现。

5 参考文档

  • debezium.io/
  • docs.confluent.io/kafka-conne…
  • downloads.apache.org/kafka/2.7.0…
  • repo1.maven.org/maven2/io/d…
  • d1i4a15mxbxib1.cloudfront.net/api/plugins…
  • 相关文章

    Oracle如何使用授予和撤销权限的语法和示例
    Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
    下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
    社区版oceanbase安装
    Oracle 导出CSV工具-sqluldr2
    ETL数据集成丨快速将MySQL数据迁移至Doris数据库

    发布评论