MySQL 数据同步 Elasticsearch - Logstash
本章节将介绍如何使用 Logstash 中间件将 MySQL 数据同步至 Elasticsearch。
🤖 Spring Boot 2.x 实践案例(代码仓库)
前言
上一篇文章已经详细介绍了如何使用Canal中间件将MySQL数据同步至ElasticSearch。然而,由于Canal已经很久没有得到维护,使用过程中可能会遇到许多问题。因此,在尝试Canal的同时,我们还可以考虑使用Logstash来实现类似的功能。本章将重点介绍如何使用Logstash将MySQL数据同步至ElasticSearch,如果你已经掌握了上一篇关于Canal的教程,可以直接从环境准备中的Logstash部分开始阅读。
环境准备
工具 | 版本 | Linux(ARM) | Windows | 备注 |
---|---|---|---|---|
JDK | 1.8 | jdk-8u371-linux-aarch64.tar.gz | jdk-8u371-windows-x64.exe | |
MySQL | 8.0.26 | mysql-community-server-8.0.26-1.el7.aarch64.rpm | mysql-installer-community-8.0.26.0.msi | |
Elasticsearch | 7.17.11 | elasticsearch-7.17.11-linux-aarch64.tar.gz | elasticsearch-7.17.11-windows-x86_64 | |
Logstash | 7.17.10 | logstash-7.17.10-linux-aarch64 | logstash-7.17.10-windows-x86_64 |
JDK
MySQL
Elasticsearch
Windows操作系统用户,直接解压并前往bin文件夹下运行elasticsearch.bat文件即可。
解压
tar -zxvf elasticsearch-7.17.11-linux-aarch64.tar.gz -C /usr/software
创建数据目录
cd /usr/software/elasticsearch-7.17.11/
mkdir data
修改配置文件
cd /usr/software/elasticsearch-7.17.11/
vi config/elasticsearch.yml
在配置文件中,放开相关注释,主要修改内容如下:
# 集群名称
cluster.name: xxx
# 节点名称
node.name: node-1
# 数据与日志存储目录
path.data: /usr/software/elasticsearch-7.17.11/data
path.logs: /usr/software/elasticsearch-7.17.11/logs
# 任何计算机节点访问
network.host: 0.0.0.0
# 默认端口
http.port: 9200
# 初始主节点
cluster.initial_master_nodes: ["node-1"]
开放端口
firewall-cmd --add-port=9300/tcp --permanent
firewall-cmd --add-port=9200/tcp --permanent
firewall-cmd --reload
systemctl restart firewalld
用户权限问题
注意:在 Linux 系统下,root 用户无法启动 Elasticsearch,所以需额外创建专属用户用来启动 Elasticsearch。
# 创建用户
useradd elastic
# 授权
chown -R elastic /usr/software/elasticsearch-7.17.11/
内存不足问题
注意:如果服务器内存足够大此步骤可以跳过,不然启动时会报内存不足错误!
cd /usr/software/elasticsearch-7.17.11/
vi config/jvm.options
# 设置堆内存大小
-Xms256m
-Xmx256m
其它问题
vi /etc/sysctl.conf
# 在末尾添加以下配置
vm.max_map_count=655360
sysctl -p
vi /etc/security/limits.conf
# 在末尾添加以下配置
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* soft nproc 4096
后台启动
su elastic
cd /usr/software/elasticsearch-7.17.11/bin/
./elasticsearch -d
可视化插件
Install Multi Elasticsearch Head (opens new window) from the Chrome Web Store.
Click the extension icon in the toolbar of your web browser.
Note that you don’t need to enable CORS (opens new window) with this method.
Logstash
注意:本教程所使用IP地址为172.16.138.130,请根据实际IP进行替换!
解压
tar -zxvf logstash-7.17.10-linux-aarch64.tar.gz -C /usr/software
创建目录
cd /usr/software/logstash-7.17.10
# 此文件夹专门存放与MySQL相关管道配置文件与驱动包
mkdir mysql
下载驱动
在Maven Repository中搜索MySQL Connector Java,选择对应MySQL版本,进行下载即可。
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
管道配置
在mysql
文件夹下创建.conf
拓展名配置文件,文件名自定义即可:
cd /usr/software/logstash-7.17.10/mysql
vi jdbc.conf
配置 | 说明 |
---|---|
input | 指定输入数据源。支持的数据源类型,请参见Input plugins。本文使用JDBC数据源,具体参数说明请参见input参数说明。 |
filter | 指定对输入数据进行过滤插件。支持的插件类型,请参见Filter plugins。 |
output | 指定目标数据源类型。支持的数据源类型,请参见Output plugins。本文需要将MySQL中的数据同步至Elasticsearch中,因此output中需要指定目标Elasticsearch的信息。 |
以下配置按照测试数据配置,在实际业务中,请按照业务需求进行合理配置:
input {
jdbc {
# 多表同步时,表类型区分,建议命名为“库名_表名”
type => "mytest_user"
# 指定JDBC连接MySQL驱动文件
jdbc_driver_library => "/usr/software/logstash-7.17.10/mysql/mysql-connector-java-8.0.30.jar"
# MySQL驱动
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 数据库连接信息
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "123456"
# 是否启用分页
jdbc_paging_enabled => "true"
# 分页大小
jdbc_page_size => "500"
# 是否记录上次执行结果。如果为true,则会把上次执行到的tracking_column字段值记录下来,保存到last_run_metadata_path指定文件中。
record_last_run => true
# 指定最后运行时间文件存放地址
last_run_metadata_path => "/usr/software/logstash-7.17.10/mysql/last_run_metadata_update_time.txt"
# 指定跟踪列,该列必须是递增的,一般是MySQL主键。
tracking_column => "update_time"
# 是否需要记录某个column值。当该值设置成true时,系统会记录tracking_column参数所指定的列的最新的值,并在下一次管道执行时通过该列的值来判断需要更新的记录。
use_column_value => "true"
# 跟踪列类型,默认是numeric。
tracking_column_type => "timestamp"
# 同步频率
schedule => "*/5 * * * * *"
# 指定SQL语句
statement => "SELECT * FROM user WHERE update_time > :sql_last_value AND update_time "false"
}
}
filter {
}
output {
if [type] == "mytest_user" {
elasticsearch {
# 配置ES集群地址
hosts => ["127.0.0.1:9200"]
# 索引名称(必须小写)
index => "user"
# 用户名
user => ""
# 密码
password => ""
# 数据唯一索引(建议使用数据库主键)
document_id => "%{id}"
}
}
stdout {
codec => json_lines
}
}
注意:jdbc_driver_library与last_run_metadata_path需要写绝对路径,使用相对路径可能会提示无权限。
数据同步
终于到了数据同步操作环节,现在需求如下:将MySQL中user
表数据同步到ES中user
索引,那么就跟着我一起动手操作吧!
创建数据库及表
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '用户名',
`age` int DEFAULT NULL COMMENT '年龄',
`gender` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
`create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
`update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
SET FOREIGN_KEY_CHECKS = 1;
启动
cd /usr/software/logstash-7.17.10
bin/logstash -f mysql/jdbc.conf
注意:管道配置文件所在路径务必保证正确!
检查是否启动成功
同步测试
往MySQL数据库中user
表添加一条记录,然后前往Elasticsearch可视化界面查看是否同步成功:
常见问题
删除数据
如果一条记录从MySQL中删除,该操作并不会同步到Elasticsearch中。为了实现删除同步操作,可以考虑使用软删除
,即逻辑删除方式:
在MySQL数据表中添加一个is_deleted
字段,用来表示记录是否有效。一旦发生更新,is_deleted
也会同步更新到Elasticsearch中。使用这个办法,在执行MySQL或Elasticsearch查询时,需要重写查询语句来过滤掉is_deleted
为 true
的记录,从而达到软删除效果。
参考链接
- Ingest data from a relational database into Elasticsearch Service
- Structure of a Config File
- 如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步