1. Logstash 的基本原理
Logstash 是一个用于数据传输和处理的组件。通过插件的组合,Logstash 可以满足各种日志采集的场景:logstash->elasticsearchfilebeat->logstash->elasticsearchfilebeat->kafka->logstash->elasticsearchfilebeat->redis->logstash->elasticsearch
2. Logstash 的基本配置
下面是一个 Logstash 的配置格式:
1
2
3
4
5
6
7
8
9
10
11
12
|
# 数据源,例如 Kafka、MySQL
input {
}
# 过滤器,用于处理数据,可选,例如丢掉空值、添加标签等
filter {
}
# 输出,例如 ES、文件
output {
}
|
input 数据源包括:azure_event_hubs、beats、cloudwatch、couchdb_changes、dead_letter_queue、elastic_agent、elasticsearch、exec、file、ganglia、gelf、generator、github、google_cloud_storage、google_pubsub、graphite、heartbeat、http、http_poller、imap、irc、java_generator、java_stdin、jdbc、jms、jmx、kafka、kinesis、log4j、lumberjack、meetup、pipe、puppet_facter、rabbitmq、redis、relp、rss、s3、s3-sns-sqs、salesforce、snmp、snmptrap、sqlite、sqs、stdin、stomp、syslog、tcp、twitter、udp、unix、varnishlog、websocket、wmi、xmpp。参考:ES 输入插件filter 过滤插件包括:age、aggregate、alter、bytes、cidr、cipher、clone、csv、date、de_dot、dissect、dns、drop、elapsed、elasticsearch、environment、extractnumbers、fingerprint、geoip、grok、http、i18n、java_uuid、jdbc_static、jdbc_streaming、json、json_encode、kv、memcached、metricize、metrics、mutate、prune、range、ruby、sleep、split、syslog_pri、threats_classifier、throttle、tld、translate、truncate、urldecode、useragent、uuid、wurfl_device_detection、xml参考:ES 过滤插件output 输出源包括:boundary、circonus、cloudwatch、csv、datadog、datadog_metrics、dynatrace、elastic_app_search、elastic_workplace_search、elasticsearch、email、exec、file、ganglia、gelf、google_bigquery、google_cloud_storage、google_pubsub、graphite、graphtastic、http、influxdb、irc、java_stdout、juggernaut、kafka、librato、loggly、lumberjack、metriccatcher、mongodb、nagios、nagios_nsca、opentsdb、pagerduty、pipe、rabbitmq、redis、redmine、riak、riemann、s3、sink、sns、solr_http、sqs、statsd、stdout、stomp、syslog、tcp、timber、udp、webhdfs、websocket、xmpp、zabbix参考:ES 输出插件在配置时,需要根据使用场景,结合官方文档进行调试。
3. Logstash 的配置示例
3.1 标准出入到标准输出
1
2
3
4
5
6
|
input {
stdin {}
}
output {
stdout { codec => rubydebug }
}
|
3.2 文件到 ES
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
input {
file {
path => [ "/data/nginx/logs/nginx_access.log" ]
start_position => "beginning"
ignore_older => 0
}
}
filter {
# 使用 grok 插件对日志内容进行格式化
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "nginx-access"
}
}
|
3.3 Filebeats 到 ES
1
2
3
4
5
6
7
8
9
10
11
|
input {
beats {
port => 8088
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "beats"
}
}
|
3.4 Kafka 到 ES
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
input {
kafka {
bootstrap_servers => "${KAFKA_URL:localhost:2187}"
topics => "${KAFKA_TOPIC:kafka-topic-test}"
group_id => "${KAFKA_GROUP_ID:logstash-es}"
consumer_threads => "${KAFKA_CONSUMER_THREADS:6}"
}
}
filter {
if ![message] {
drop { }
}
mutate {
remove_field => [ "headers", "result"]
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "kafka"
}
}
|