通常,很多关系数据项目都使用 MySQL。 它对于标准的 CRUD 操作是有益的,但有时我们需要做额外的过程。 当我们搜索某些内容时,我们会消耗资源或合并多个表。 有时,即使不是,可能仍然需要复杂的 SQL 查询。 也许这不是正确的方法,但这是我们改变技术堆栈的不同方法。 对于这个堆栈,我们首先描述 Logstash。
更多阅读:“Elastic:开发者上手指南” 中的 “数据库数据同步” 章节。
我们什么时候使用 Logstash?
Logstash 用于必须从源接收数据、处理数据然后发送到另一个目的地的场景。 作为起点,Logstash 连接到 MySQL 读取数据,然后对其进行处理,最后将其发送到 Elasticsearch。 如下图所示
Logstash 的操作分为三个步骤:
在添加此代码之前,我创建了一个名为 search.conf 的 Logstash 配置文件。
1. input{
2. jdbc {
3. jdbc_driver_library => "/home/mysql-connector-java-8.0.22.jar"
4. jdbc_driver_class => "com.mysql.jdbc.Driver"
5. jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:3306/product?zeroDateTimeBehavior=convertToNull"
6. jdbc_user => 'root'
7. jdbc_password => ''
8. statement => "Select product.*, DATE_FORMAT(updatedAt, '%Y-%m-%d %T') as lastTransaction from product where updatedAt > :sql_last_value"
9. tracking_column => "lastTransaction"
10. tracking_column_type => "timestamp"
11. use_column_value => true
12. lowercase_column_names => false
13. clean_run => true
14. schedule => "*/15 * * * * *"
15. }
16. }
此输入运行 MySQL 查询并包含一个存储最新更新日期的跟踪列。 感谢此列,我可以有效地仅检索最后更新的行,而不是获取所有行。 现在我有了数据,我可以继续处理了。
1. filter {
2. ruby {
3. code => "
4. if event.get('productStatusFK')
5. productStatusFK = event.get('productStatusFK').to_i
6. if productStatusFK == 0
7. event.set('productStatusFK', 'passive')
8. elsif productStatusFK == 1
9. event.set('productStatusFK', 'active')
10. end
11. end
12. "
13. }
14. mutate {
15. remove_field => ["@version", "@timestamp"]
16. }
17. }
下一步是将数据发送到 Elasticsearch。 我们如何执行输出操作?
1. output {
2. elasticsearch {
3. hosts => [ "http://${ELASTIC_HOST}:9200" ]
4. document_id => '%{productPK}'
5. index => "product"
6. doc_as_upsert => true
7. action => "update"
8. codec => "json"
9. manage_template => true
10. template_overwrite => true
11. }
12. }
我完成了配置文件。 之后,我为 docker-compose 准备 app.yml。 它包含 4 个服务:MySQL,Logstash,Elasticsearch 及 Kibana。 这三个服务位于同一网络上。 由于这个网络,他们能够使用他们的服务名称相互通信。 同样对于 logstash,我创建了一个管道并在文件中对其进行了描述。 该文件使用 search.conf 文件。 所有文件已添加到 Github,下面提供了链接。 详细信息在下面的代码中。
app.yml
1. version: "3.7"
3. services:
4. elasticsearch:
5. image: elasticsearch:${STACK_VERSION}
6. volumes:
7. - type: volume
8. source: es_data
9. target: /usr/share/elasticsearch/data
10. ports:
11. - "9200:9200"
12. - "9300:9300"
13. environment:
14. - discovery.type=single-node
15. - ES_JAVA_OPTS=-Xms750m -Xmx750m
16. - xpack.security.enabled=false
17. networks:
18. - elastic
20. kibana:
21. image: kibana:${STACK_VERSION}
22. container_name: kibana
23. ports:
24. - target: 5601
25. published: 5601
26. depends_on:
27. - elasticsearch
28. networks:
29. - elastic
31. logstash:
32. image: logstash:${STACK_VERSION}
33. depends_on:
34. - elasticsearch
35. volumes:
36. - ./product/pipeline/:/usr/share/logstash/pipeline/
37. - ./product/config/pipeline.yml:/usr/share/logstash/config/pipeline.yml
38. - ./mysqlConnector/mysql-connector-java-8.0.22.jar:/home/mysql-connector-java-8.0.22.jar
39. environment:
40. - ELASTIC_HOST=elasticsearch
41. - MYSQL_HOST=mysql
42. networks:
43. - elastic
45. mysql:
46. image: mysql:8
47. restart: always
48. command: --default-authentication-plugin=mysql_native_password
49. environment:
50. MYSQL_DATABASE: product
51. MYSQL_TCP_PORT: 3306
52. MYSQL_ALLOW_EMPTY_PASSWORD: "true"
53. volumes:
54. - ./database/product.sql:/docker-entrypoint-initdb.d/product.sql:ro
55. ports:
56. - "3306:3306"
57. networks:
58. - elastic
60. volumes:
61. es_data:
62. driver: local
64. networks:
65. elastic:
66. name: elastic
67. driver: bridge
我们需要在 .env 文件中指定我们需要的 Elastic Stack 版本:
1. $ pwd
2. /Users/liuxg/data/MySQL2ElasticFlow
3. $ ls -al
4. total 24
5. drwxr-xr-x 9 liuxg staff 288 Jul 9 11:00 .
6. drwxr-xr-x 184 liuxg staff 5888 Jul 9 10:54 ..
7. -rw-r--r-- 1 liuxg staff 20 Jul 9 11:00 .env
8. drwxr-xr-x 12 liuxg staff 384 Jul 9 10:54 .git
9. -rw-r--r-- 1 liuxg staff 3384 Jul 9 10:54 README.md
10. -rw-r--r-- 1 liuxg staff 1513 Jul 9 11:01 app.yml
11. drwxr-xr-x 3 liuxg staff 96 Jul 9 10:54 database
12. drwxr-xr-x 3 liuxg staff 96 Jul 9 10:54 mysqlConnector
13. drwxr-xr-x 4 liuxg staff 128 Jul 9 10:54 product
14. $ cat .env
15. STACK_VERSION=8.8.2
让我们测试一下这个结构。 使用 docker-compose 构建并运行应用程序。如果你已经有 mysqld 正在运行,你可以使用如下的命令来停止它的运行:
mysqladmin -u root shutdown -p
docker-compose -f app.yml up
我们可以使用如下的命令来查看运行的容器:
docker ps
1. $ docker ps
2. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3. 662fd611a7a5 mysql:8 "docker-entrypoint.s…" 2 minutes ago Up 2 minutes 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
4. 3c3754292b27 logstash:8.8.1 "/usr/local/bin/dock…" 38 minutes ago Up 2 minutes 5044/tcp, 9600/tcp logstash
5. 5b6b423363b5 kibana:8.8.1 "/bin/tini -- /usr/l…" 38 minutes ago Up 2 minutes 0.0.0.0:5601->5601/tcp kibana
6. b6bd075c5189 elasticsearch:8.8.1 "/bin/tini -- /usr/l…" 38 minutes ago Up 2 minutes 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp elasticsearch
我们可以看到有四个正在运行的容器。我们可以通过如下的命令来查看 Logstash 的日志:
docker logs -f logstash
现在是时候在 Kibana 中进行查看了:
GET _cat/indices
yellow open product yUIb3AWPQKqvm6wnbSsoNQ 1 1 4 0 15.6kb 15.6kb
我们可以通过如下的方式来查看里面的文档:
GET product/_search
我们可以看到有四个文档:
很显然,它是我们之前在 database/product.sql 中写进去的四个文档:
1. INSERT INTO `product` (`productPK`, `productName`, `productCode`, `productStatusFK`, `active`, `updatedAt`) VALUES
2. (1, 'logitech', 'logitech', 1, 1, '2023-07-05 01:00:00'),
3. (2, 'asus', 'asus', 1, 0, '2023-07-04 01:00:00'),
4. (3, 'apple', 'apple', 1, 0, '2023-07-03 01:00:00'),
5. (4, 'hewlett packard', 'hewlett packard', 1, 1, '2023-07-02 01:00:00');
我们可以对数据进行如下的搜索:
1. GET product/_search?filter_path=**.hits
2. {
3. "query": {
4. "bool": {
5. "must": [
6. {
7. "multi_match": {
8. "query": "asu",
9. "fields": [
10. "productName",
11. "productCode"
12. ],
13. "fuzziness": "auto"
14. }
15. }
16. ]
17. }
18. }
19. }
我们可以得到如下的结果:
1. {
2. "hits": {
3. "hits": [
4. {
5. "_index": "product",
6. "_id": "2",
7. "_score": 0.87417156,
8. "_source": {
9. "productPK": 2,
10. "active": false,
11. "updatedAt": "2023-07-04T01:00:00.000Z",
12. "productStatusFK": "active",
13. "productCode": "asus",
14. "productName": "asus",
15. "lastTransaction": "2023-07-04 01:00:00"
16. }
17. }
18. ]
19. }
20. }
Hooray! 我们已经完成了从 MySQL 通过 Logstash 把文档写入到 Elasticsearch!