数据集成的强大联盟:Elasticsearch、Kibana、Logstash、MySQL

2023年 7月 12日 36.1k 0

通常,很多关系数据项目都使用 MySQL。 它对于标准的 CRUD 操作是有益的,但有时我们需要做额外的过程。 当我们搜索某些内容时,我们会消耗资源或合并多个表。 有时,即使不是,可能仍然需要复杂的 SQL 查询。 也许这不是正确的方法,但这是我们改变技术堆栈的不同方法。 对于这个堆栈,我们首先描述 Logstash。

更多阅读:“Elastic:开发者上手指南” 中的 “数据库数据同步” 章节。

我们什么时候使用 Logstash?

Logstash 用于必须从源接收数据、处理数据然后发送到另一个目的地的场景。 作为起点,Logstash 连接到 MySQL 读取数据,然后对其进行处理,最后将其发送到 Elasticsearch。 如下图所示

Logstash 的操作分为三个步骤:

  • 输入 - input
  • 过滤器 - filter
  • 输出 - output
  • 在添加此代码之前,我创建了一个名为 search.conf 的 Logstash 配置文件。

    
    
    1.  input{
    2.          jdbc {
    3.              jdbc_driver_library => "/home/mysql-connector-java-8.0.22.jar"
    4.              jdbc_driver_class => "com.mysql.jdbc.Driver"
    5.              jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:3306/product?zeroDateTimeBehavior=convertToNull"
    6.              jdbc_user => 'root'
    7.              jdbc_password => ''
    8.              statement => "Select product.*, DATE_FORMAT(updatedAt, '%Y-%m-%d %T') as lastTransaction from product where updatedAt > :sql_last_value"
    9.              tracking_column => "lastTransaction"
    10.              tracking_column_type => "timestamp"
    11.              use_column_value => true
    12.              lowercase_column_names => false
    13.              clean_run => true
    14.              schedule => "*/15 * * * * *"
    15.          }
    16.  }
    
    
    

    此输入运行 MySQL 查询并包含一个存储最新更新日期的跟踪列。 感谢此列,我可以有效地仅检索最后更新的行,而不是获取所有行。 现在我有了数据,我可以继续处理了。

    
    
    1.  filter {
    2.      ruby {
    3.          code => "
    4.              if event.get('productStatusFK')
    5.                  productStatusFK = event.get('productStatusFK').to_i
    6.                  if productStatusFK == 0
    7.                      event.set('productStatusFK', 'passive')
    8.                  elsif productStatusFK == 1
    9.                      event.set('productStatusFK', 'active')
    10.                  end
    11.              end
    12.          "
    13.      }
    14.      mutate {
    15.          remove_field => ["@version", "@timestamp"]
    16.      }
    17.  }
    
    
    

    下一步是将数据发送到 Elasticsearch。 我们如何执行输出操作?

    
    
    1.  output { 
    2.      elasticsearch {
    3.          hosts => [ "http://${ELASTIC_HOST}:9200" ]
    4.          document_id => '%{productPK}'
    5.          index => "product"
    6.          doc_as_upsert => true
    7.          action => "update"
    8.          codec => "json"
    9.          manage_template => true
    10.          template_overwrite => true
    11.      }
    12.  }
    
    
    

    我完成了配置文件。 之后,我为 docker-compose 准备 app.yml。 它包含 4 个服务:MySQL,Logstash,Elasticsearch 及 Kibana。 这三个服务位于同一网络上。 由于这个网络,他们能够使用他们的服务名称相互通信。 同样对于 logstash,我创建了一个管道并在文件中对其进行了描述。 该文件使用 search.conf 文件。 所有文件已添加到 Github,下面提供了链接。 详细信息在下面的代码中。

    app.yml

    
    
    1.  version: "3.7"
    
    3.  services:
    4.    elasticsearch:
    5.      image: elasticsearch:${STACK_VERSION}
    6.      volumes:
    7.        - type: volume
    8.          source: es_data
    9.          target: /usr/share/elasticsearch/data
    10.      ports:
    11.        - "9200:9200"
    12.        - "9300:9300"
    13.      environment:
    14.        - discovery.type=single-node
    15.        - ES_JAVA_OPTS=-Xms750m -Xmx750m
    16.        - xpack.security.enabled=false
    17.      networks:
    18.        - elastic
    
    20.    kibana:
    21.      image: kibana:${STACK_VERSION}
    22.      container_name: kibana
    23.      ports:
    24.        - target: 5601
    25.          published: 5601
    26.      depends_on:
    27.        - elasticsearch
    28.      networks:
    29.        - elastic  
    
    31.    logstash:
    32.      image: logstash:${STACK_VERSION}
    33.      depends_on:
    34.        - elasticsearch
    35.      volumes:
    36.        - ./product/pipeline/:/usr/share/logstash/pipeline/
    37.        - ./product/config/pipeline.yml:/usr/share/logstash/config/pipeline.yml
    38.        - ./mysqlConnector/mysql-connector-java-8.0.22.jar:/home/mysql-connector-java-8.0.22.jar
    39.      environment:
    40.        - ELASTIC_HOST=elasticsearch
    41.        - MYSQL_HOST=mysql
    42.      networks:
    43.        - elastic
    
    45.    mysql:
    46.      image: mysql:8
    47.      restart: always
    48.      command: --default-authentication-plugin=mysql_native_password
    49.      environment:
    50.        MYSQL_DATABASE: product
    51.        MYSQL_TCP_PORT: 3306
    52.        MYSQL_ALLOW_EMPTY_PASSWORD: "true"
    53.      volumes:
    54.        - ./database/product.sql:/docker-entrypoint-initdb.d/product.sql:ro
    55.      ports:
    56.        - "3306:3306"
    57.      networks:
    58.        - elastic
    
    60.  volumes:
    61.    es_data:
    62.      driver: local
    
    64.  networks:
    65.    elastic:
    66.      name: elastic
    67.      driver: bridge
    
    
    

    我们需要在 .env 文件中指定我们需要的 Elastic Stack 版本:

    
    
    1.  $ pwd
    2.  /Users/liuxg/data/MySQL2ElasticFlow
    3.  $ ls -al
    4.  total 24
    5.  drwxr-xr-x    9 liuxg  staff   288 Jul  9 11:00 .
    6.  drwxr-xr-x  184 liuxg  staff  5888 Jul  9 10:54 ..
    7.  -rw-r--r--    1 liuxg  staff    20 Jul  9 11:00 .env
    8.  drwxr-xr-x   12 liuxg  staff   384 Jul  9 10:54 .git
    9.  -rw-r--r--    1 liuxg  staff  3384 Jul  9 10:54 README.md
    10.  -rw-r--r--    1 liuxg  staff  1513 Jul  9 11:01 app.yml
    11.  drwxr-xr-x    3 liuxg  staff    96 Jul  9 10:54 database
    12.  drwxr-xr-x    3 liuxg  staff    96 Jul  9 10:54 mysqlConnector
    13.  drwxr-xr-x    4 liuxg  staff   128 Jul  9 10:54 product
    14.  $ cat .env
    15.  STACK_VERSION=8.8.2
    
    
    

    让我们测试一下这个结构。 使用 docker-compose 构建并运行应用程序。如果你已经有 mysqld 正在运行,你可以使用如下的命令来停止它的运行:

    mysqladmin -u root shutdown -p
    
    docker-compose -f app.yml up
    

    我们可以使用如下的命令来查看运行的容器:

    docker ps
    
    
    
    1.  $ docker ps
    2.  CONTAINER ID   IMAGE                 COMMAND                  CREATED          STATUS         PORTS                                            NAMES
    3.  662fd611a7a5   mysql:8               "docker-entrypoint.s…"   2 minutes ago    Up 2 minutes   0.0.0.0:3306->3306/tcp, 33060/tcp                mysql
    4.  3c3754292b27   logstash:8.8.1        "/usr/local/bin/dock…"   38 minutes ago   Up 2 minutes   5044/tcp, 9600/tcp                               logstash
    5.  5b6b423363b5   kibana:8.8.1          "/bin/tini -- /usr/l…"   38 minutes ago   Up 2 minutes   0.0.0.0:5601->5601/tcp                           kibana
    6.  b6bd075c5189   elasticsearch:8.8.1   "/bin/tini -- /usr/l…"   38 minutes ago   Up 2 minutes   0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp   elasticsearch
    
    
    

    我们可以看到有四个正在运行的容器。我们可以通过如下的命令来查看 Logstash 的日志:

    docker logs -f logstash
    

    现在是时候在 Kibana 中进行查看了:

    GET _cat/indices
    
    yellow open product yUIb3AWPQKqvm6wnbSsoNQ 1 1 4 0 15.6kb 15.6kb
    

    我们可以通过如下的方式来查看里面的文档:

    GET product/_search
    

    我们可以看到有四个文档:

    很显然,它是我们之前在 database/product.sql 中写进去的四个文档:

    
    
    1.  INSERT INTO `product` (`productPK`, `productName`, `productCode`, `productStatusFK`, `active`, `updatedAt`) VALUES
    2.  (1, 'logitech', 'logitech', 1, 1, '2023-07-05 01:00:00'),
    3.  (2, 'asus', 'asus', 1, 0, '2023-07-04 01:00:00'),
    4.  (3, 'apple', 'apple', 1, 0, '2023-07-03 01:00:00'),
    5.  (4, 'hewlett packard', 'hewlett packard', 1, 1, '2023-07-02 01:00:00');
    
    
    

    我们可以对数据进行如下的搜索:

    
    
    1.  GET product/_search?filter_path=**.hits
    2.  {
    3.    "query": {
    4.      "bool": {
    5.        "must": [
    6.          {
    7.            "multi_match": {
    8.              "query": "asu",
    9.              "fields": [
    10.                "productName",
    11.                "productCode"
    12.              ],
    13.              "fuzziness": "auto"
    14.            }
    15.          }
    16.        ]
    17.      }
    18.    }
    19.  }
    
    
    

    我们可以得到如下的结果:

    
    
    1.  {
    2.    "hits": {
    3.      "hits": [
    4.        {
    5.          "_index": "product",
    6.          "_id": "2",
    7.          "_score": 0.87417156,
    8.          "_source": {
    9.            "productPK": 2,
    10.            "active": false,
    11.            "updatedAt": "2023-07-04T01:00:00.000Z",
    12.            "productStatusFK": "active",
    13.            "productCode": "asus",
    14.            "productName": "asus",
    15.            "lastTransaction": "2023-07-04 01:00:00"
    16.          }
    17.        }
    18.      ]
    19.    }
    20.  }
    
    
    

    Hooray! 我们已经完成了从 MySQL 通过 Logstash 把文档写入到 Elasticsearch!

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论