Canal 同步数据坑太多?来试试 Logstash!

2023年 8月 13日 21.5k 0

MySQL 数据同步 Elasticsearch - Logstash

本章节将介绍如何使用 Logstash 中间件将 MySQL 数据同步至 Elasticsearch。

🤖 Spring Boot 2.x 实践案例(代码仓库)

前言

上一篇文章已经详细介绍了如何使用Canal中间件将MySQL数据同步至ElasticSearch。然而,由于Canal已经很久没有得到维护,使用过程中可能会遇到许多问题。因此,在尝试Canal的同时,我们还可以考虑使用Logstash来实现类似的功能。本章将重点介绍如何使用Logstash将MySQL数据同步至ElasticSearch,如果你已经掌握了上一篇关于Canal的教程,可以直接从环境准备中的Logstash部分开始阅读。

环境准备

工具 版本 Linux(ARM) Windows 备注
JDK 1.8 jdk-8u371-linux-aarch64.tar.gz jdk-8u371-windows-x64.exe
MySQL 8.0.26 mysql-community-server-8.0.26-1.el7.aarch64.rpm mysql-installer-community-8.0.26.0.msi
Elasticsearch 7.17.11 elasticsearch-7.17.11-linux-aarch64.tar.gz elasticsearch-7.17.11-windows-x86_64
Logstash 7.17.10 logstash-7.17.10-linux-aarch64 logstash-7.17.10-windows-x86_64

JDK

JDK

MySQL

MySQL

Elasticsearch

Windows操作系统用户,直接解压并前往bin文件夹下运行elasticsearch.bat文件即可。

解压

tar -zxvf elasticsearch-7.17.11-linux-aarch64.tar.gz -C /usr/software

创建数据目录

cd /usr/software/elasticsearch-7.17.11/

mkdir data

修改配置文件

cd /usr/software/elasticsearch-7.17.11/

vi config/elasticsearch.yml

在配置文件中,放开相关注释,主要修改内容如下:

# 集群名称
cluster.name: xxx
# 节点名称
node.name: node-1
# 数据与日志存储目录
path.data: /usr/software/elasticsearch-7.17.11/data
path.logs: /usr/software/elasticsearch-7.17.11/logs
# 任何计算机节点访问
network.host: 0.0.0.0
# 默认端口
http.port: 9200
# 初始主节点
cluster.initial_master_nodes: ["node-1"]

WechatIMG68273.jpg

image-20230701214828528

开放端口

firewall-cmd --add-port=9300/tcp --permanent

firewall-cmd --add-port=9200/tcp --permanent

firewall-cmd --reload

systemctl restart firewalld

用户权限问题

注意:在 Linux 系统下,root 用户无法启动 Elasticsearch,所以需额外创建专属用户用来启动 Elasticsearch。

# 创建用户
useradd elastic

# 授权
chown -R elastic /usr/software/elasticsearch-7.17.11/

内存不足问题

注意:如果服务器内存足够大此步骤可以跳过,不然启动时会报内存不足错误!

cd /usr/software/elasticsearch-7.17.11/

vi config/jvm.options
# 设置堆内存大小
-Xms256m
-Xmx256m

其它问题

  • max virtual memory areas vm.max_map_count [65530] is too low, increase to at least
  • vi /etc/sysctl.conf 
    
    # 在末尾添加以下配置
    vm.max_map_count=655360
    
    sysctl -p
    
  • max file descriptors [4096] for elasticsearch process is too low
  • vi /etc/security/limits.conf
    
    # 在末尾添加以下配置
    * soft nofile 65536
    * hard nofile 131072
    * soft nproc 2048
    * soft nproc 4096
    

    后台启动

    su elastic
    
    cd /usr/software/elasticsearch-7.17.11/bin/
    
    ./elasticsearch -d
    

    可视化插件

    Install Multi Elasticsearch Head (opens new window) from the Chrome Web Store.

    Click the extension icon in the toolbar of your web browser.

    Note that you don’t need to enable CORS (opens new window) with this method.

    Logstash

    注意:本教程所使用IP地址为172.16.138.130,请根据实际IP进行替换!

    解压

    tar -zxvf logstash-7.17.10-linux-aarch64.tar.gz -C /usr/software
    

    创建目录

    cd /usr/software/logstash-7.17.10
    
    # 此文件夹专门存放与MySQL相关管道配置文件与驱动包
    mkdir mysql
    

    下载驱动

    在Maven Repository中搜索MySQL Connector Java,选择对应MySQL版本,进行下载即可。

    image-20230712101415144

    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
    

    管道配置

    mysql文件夹下创建.conf拓展名配置文件,文件名自定义即可:

    cd /usr/software/logstash-7.17.10/mysql
    
    vi jdbc.conf
    
    配置 说明
    input 指定输入数据源。支持的数据源类型,请参见Input plugins。本文使用JDBC数据源,具体参数说明请参见input参数说明。
    filter 指定对输入数据进行过滤插件。支持的插件类型,请参见Filter plugins。
    output 指定目标数据源类型。支持的数据源类型,请参见Output plugins。本文需要将MySQL中的数据同步至Elasticsearch中,因此output中需要指定目标Elasticsearch的信息。

    以下配置按照测试数据配置,在实际业务中,请按照业务需求进行合理配置:

    input {
       jdbc {
          # 多表同步时,表类型区分,建议命名为“库名_表名”
          type => "mytest_user"
          # 指定JDBC连接MySQL驱动文件
          jdbc_driver_library => "/usr/software/logstash-7.17.10/mysql/mysql-connector-java-8.0.30.jar"
          # MySQL驱动
          jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
          # 数据库连接信息
          jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
          # 数据库用户名
          jdbc_user => "root"
          # 数据库密码
          jdbc_password => "123456"
          # 是否启用分页
          jdbc_paging_enabled => "true"
          # 分页大小
          jdbc_page_size => "500"
          # 是否记录上次执行结果。如果为true,则会把上次执行到的tracking_column字段值记录下来,保存到last_run_metadata_path指定文件中。
          record_last_run => true
          # 指定最后运行时间文件存放地址
          last_run_metadata_path => "/usr/software/logstash-7.17.10/mysql/last_run_metadata_update_time.txt"
          # 指定跟踪列,该列必须是递增的,一般是MySQL主键。
          tracking_column => "update_time"
          # 是否需要记录某个column值。当该值设置成true时,系统会记录tracking_column参数所指定的列的最新的值,并在下一次管道执行时通过该列的值来判断需要更新的记录。
          use_column_value => "true"
          # 跟踪列类型,默认是numeric。
          tracking_column_type => "timestamp"
          # 同步频率
          schedule => "*/5 * * * * *"
          # 指定SQL语句
          statement => "SELECT * FROM user WHERE update_time > :sql_last_value AND update_time  "false"
      }
    }
    
    filter {
    
    }
    
    output {
       if [type] == "mytest_user" {
          elasticsearch {
             # 配置ES集群地址
             hosts => ["127.0.0.1:9200"]
             # 索引名称(必须小写)
             index => "user"
             # 用户名
             user => ""
             # 密码
             password => ""
             # 数据唯一索引(建议使用数据库主键)
             document_id => "%{id}"
          }
       }
       stdout {
          codec => json_lines
       }
    }
    

    注意:jdbc_driver_library与last_run_metadata_path需要写绝对路径,使用相对路径可能会提示无权限。

    数据同步

    终于到了数据同步操作环节,现在需求如下:将MySQL中user表数据同步到ES中user索引,那么就跟着我一起动手操作吧!

    创建数据库及表

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    -- ----------------------------
    -- Table structure for user
    -- ----------------------------
    DROP TABLE IF EXISTS `user`;
    CREATE TABLE `user` (
      `id` int NOT NULL AUTO_INCREMENT,
      `username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '用户名',
      `age` int DEFAULT NULL COMMENT '年龄',
      `gender` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
      `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
      `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
    
    SET FOREIGN_KEY_CHECKS = 1;
    

    启动

    cd /usr/software/logstash-7.17.10
    
    bin/logstash -f mysql/jdbc.conf
    

    注意:管道配置文件所在路径务必保证正确!

    检查是否启动成功

    image-20230712103022515

    同步测试

    往MySQL数据库中user表添加一条记录,然后前往Elasticsearch可视化界面查看是否同步成功:

    image-20230712103313840

    image-20230712103401156

    常见问题

    删除数据

    如果一条记录从MySQL中删除,该操作并不会同步到Elasticsearch中。为了实现删除同步操作,可以考虑使用软删除,即逻辑删除方式:

    在MySQL数据表中添加一个is_deleted字段,用来表示记录是否有效。一旦发生更新,is_deleted也会同步更新到Elasticsearch中。使用这个办法,在执行MySQL或Elasticsearch查询时,需要重写查询语句来过滤掉is_deletedtrue的记录,从而达到软删除效果。

    参考链接

    • Ingest data from a relational database into Elasticsearch Service
    • Structure of a Config File
    • 如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步

    相关文章

    Oracle如何使用授予和撤销权限的语法和示例
    Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
    下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
    社区版oceanbase安装
    Oracle 导出CSV工具-sqluldr2
    ETL数据集成丨快速将MySQL数据迁移至Doris数据库

    发布评论