ELK5.5TomcatAccess日志grok处理(filebeat)

2023年 7月 15日 83.8k 0

tomcat的访问日志有很多可以调整,这里使用%h %l %u %t [%r] %s [%{Referer}i] [%{User-Agent}i] %b %T,如下

日志格式

%h 访问的用户IP地址
%l 访问逻辑用户名,通常返回'-'
%u 访问验证用户名,通常返回'-'
%t 访问日时
%r 访问的方式(post或者是get),访问的资源和使用的http协议版本
%s 访问返回的http状态
%b 访问资源返回的流量
%T 访问所使用的时间
[%{Referer}i] 
[%{User-Agent}i]

其他可参考:http://tomcat.apache.org/tomcat-8.5-doc/config/valve.html#Access_Logging

修改配置文件

[root@linuxea.com-Node117 /data/tomcat]# tail -9 conf/server.xml 
        <Valve className="org.apache.catalina.valves.AccessLogValve"
               directory="logs" prefix="access_log"
               suffix=".log" rotatable="true" resolveHosts="false"
               pattern="%h %l %u %t [%r] %s [%{Referer}i] [%{User-Agent}i] %b %T" />

      </Host>
    </Engine>
  </Service>
</Server>
[root@linuxea.com-Node117 /data/tomcat]# 

格式如下:


        <Valve className="org.apache.catalina.valves.AccessLogValve"
               directory="logs" prefix="access_log" fileDateFormat="yyyy-MM-dd.HH"
               suffix=".log" rotatable="true" resolveHosts="false"
               pattern="%h %l %u %t [%r] %s [%{Referer}i] [%{User-Agent}i] %b %T" />

那么设置后显示出的日志是这样的:

10.10.0.96 - - [04/Sep/2017:19:54:07 +0800] [GET / HTTP/1.1] 200 [-] [Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36] 5 0.104

Pattern是这样的:

[root@linuxea.com-Node49 /etc/logstash/patterns.d]# cat java 
JETTYAUDIT %{IP:clent_ip} (?:-|%{USER:logic_user}) (?:-|%{USER:verification_user}) [%{HTTPDATE:timestamp}] [(?:%{WORD:http_verb} %{NOTSPACE:request_url}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})] %{NUMBER:status} [(?:-|%{NOTSPACE:request_url_2})] [%{GREEDYDATA:agent}] (?:-|%{NUMBER:curl_size}) (?:-|%{NUMBER:responsetime})

最终收集的结果图标是这样的:tomcat-access.png

filebeat配置

[root@linuxea.com-Node117 /data/tomcat]# cat /etc/filebeat/filebeat.yml 
filebeat.prospectors:
 - input_type: log
   paths:
    - /data/logs/access_nginx.log
   document_type: nginx-access-117
 - input_type: log
   paths:
    - /data/logs/slow_log.CSV
 - input_type: log
   paths:
     - /data/logs/java.log
   document_type: java-117
output.redis:
  hosts: ["10.10.0.98"]
  password: "OTdmOWI4ZTM4NTY1M2M4OTZh"
  key: "default_list"
  db: 5
  timeout: 5
  keys:
    - key: "%{[type]}"
      mapping:
      "nginx-access-117": "nginx-access-117"
      "mysql-slow-117" : "mysql-slow-117"
      "java-117" : "java-117"

logstash配置

input

    redis {
         host => "10.10.0.98"
         port => "6379"
         key => "java-117"
         data_type => "list"
         password => "OTdmOWI4ZTM4NTY1M2M4OTZh"
         threads => "5"
         db => "5"
       }

filter

    if [type] == "java-117" {
    grok {
      patterns_dir => "/etc/logstash/patterns.d"
      match => { "message" => "%{JETTYAUDIT}" }
    }
    useragent {
        source => "agent"
        target => "userAgent"
        }
    urldecode {
        all_fields => true
        }
     mutate {
            gsub => ["agent","["]",""]        #将agent中的 " 换成空
            convert => [ "response","integer" ]
            convert => [ "body_bytes_sent","integer" ]
            convert => [ "bytes_sent","integer" ]
            convert => [ "upstream_response_time","float" ]
            convert => [ "upstream_status","integer" ]
            convert => [ "request_time","float" ]
            convert => [ "port","integer" ]
       }
    geoip {
        source => "client_ip"
        database => "/etc/logstash/GeoLite2-City.mmdb"
        }
    if [params] {
      kv {
        field_split => ",?"
        source => "params"
      }
    }
    if [source] =~ //API/ {
      mutate {
        add_field => { "mode" => "API"}
      }
    } else {
      mutate {
        add_field => { "mode" => "ENT"}
      }
    }
    date {
      match => [ "date" , "yyyy-MM-dd HH:mm:ss.SSS" ]
    }
    }

ouput

    if [type] == "java-117" {
    elasticsearch {
        hosts => ["10.0.1.49:9200"]
        index => "logstash-java-117-%{+YYYY.MM.dd}"
        user => "elastic"
        password => "linuxea"
    }
    }

完整的配置

[root@linuxea.com-Node49 /etc/logstash/patterns.d]# cat ../conf.d/redis-output.yml 
input {
    redis {
        host => "10.10.0.98"
        port => "6379"
        key => "nginx-access-117"
        data_type => "list"
        password => "OTdmOWI4ZTM4NTY1M2M4OTZh"
        threads => "5"
        db => "5"
         }
   redis {
        host => "10.10.0.98"
        port => "6379"
        key => "mysql-slow-117"
        data_type => "list"
        password => "OTdmOWI4ZTM4NTY1M2M4OTZh"
        threads => "5"
        db => "5"
        }
    redis {
         host => "10.10.0.98"
         port => "6379"
         key => "java-117"
         data_type => "list"
         password => "OTdmOWI4ZTM4NTY1M2M4OTZh"
         threads => "5"
         db => "5"
       }
    }
filter {
   if [type] == "nginx-access-117" {
    grok {
        patterns_dir => [ "/etc/logstash/patterns.d" ]
        match => { "message" => "%{NGINXACCESS}" }
        overwrite => [ "message" ]
        }
    geoip {
        source => "clent_ip"
        target => "geoip"
#        database => "/etc/logstash/GeoLiteCity.dat"
        database => "/etc/logstash/GeoLite2-City.mmdb"
         }
    useragent {
        source => "User_Agent"
        target => "userAgent"
        }
    urldecode {
        all_fields => true
        }
     mutate {
            gsub => ["User_Agent","["]",""]        #将user_agent中的 " 换成空
            convert => [ "response","integer" ]
            convert => [ "body_bytes_sent","integer" ]
            convert => [ "bytes_sent","integer" ]
            convert => [ "upstream_response_time","float" ]
            convert => [ "upstream_status","integer" ]
            convert => [ "request_time","float" ]
            convert => [ "port","integer" ]
       }
    date {
    match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
        }
        }
#########################mysql-slow#####################################        
    if [type] == "mysql-slow-117" {
    csv {
      columns => [ "timestamp", "user_host", "query_time", "lock_time",
                   "rows_sent", "rows_examined", "db", "last_insert_id",
                   "insert_id", "server_id", "sql_text", "thread_id", "rows_affected" ]
    }
    mutate {
      convert => { "rows_sent" => "integer" }
      convert => { "rows_examined" => "integer" }
      convert => { "last_insert_id" => "integer" }
      convert => { "insert_id" => "integer" }
      convert => { "server_id" => "integer" }
      convert => { "thread_id" => "integer" }
      convert => { "rows_affected" => "integer" }
    }
    date {
      match => [ "timestamp", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
      remove_field => [ "timestamp" ]
    }
#    mutate { remove_field => [ "message" ] }
    mutate {
      gsub => [
        "query_time", "(.*.)(d)(d)d+", "123",
        "lock_time", "(.*.)(d)(d)d+", "123"
        ]
        }
     ruby { code => "event.set('query_time' , event.get('query_time') ? event.get('query_time').split(':').inject(0){|a, m| a = a * 60 + m.to_f} : 0)"}
     ruby { code => "event.set('lock_time' , event.get('lock_time') ? event.get('lock_time').split(':').inject(0){|a, m| a = a * 60 + m.to_f} : 0)" }
  }
#########################java#####################################
    if [type] == "java-117" {
    grok {
      patterns_dir => "/etc/logstash/patterns.d"
      match => { "message" => "%{JETTYAUDIT}" }
    }
    useragent {
        source => "agent"
        target => "userAgent"
        }
    urldecode {
        all_fields => true
        }
     mutate {
            gsub => ["agent","["]",""]        #将agent中的 " 换成空
            convert => [ "response","integer" ]
            convert => [ "body_bytes_sent","integer" ]
            convert => [ "bytes_sent","integer" ]
            convert => [ "upstream_response_time","float" ]
            convert => [ "upstream_status","integer" ]
            convert => [ "request_time","float" ]
            convert => [ "port","integer" ]
       }
    geoip {
        source => "client_ip"
        database => "/etc/logstash/GeoLite2-City.mmdb"
        }
    if [params] {
      kv {
        field_split => ",?"
        source => "params"
      }
    }
    if [source] =~ //API/ {
      mutate {
        add_field => { "mode" => "API"}
      }
    } else {
      mutate {
        add_field => { "mode" => "ENT"}
      }
    }
    date {
      match => [ "date" , "yyyy-MM-dd HH:mm:ss.SSS" ]
    }
    }
#########################java#####################################    
}
output {
    if "_grokparsefailure" in [tags] {
    file { path => "/var/log/logstash/grokparsefailure-%{[type]}-%{+YYYY.MM.dd}.log" }
    }
    if [type] == "nginx-access-117" {
    elasticsearch {
        hosts => ["10.0.1.49:9200"]
        index => "logstash-nginx-access-117-%{+YYYY.MM.dd}"
        user => "elastic"
        password => "linuxea"
    }
    }
    if [type] == "mysql-slow-117" {
    elasticsearch {
        hosts => ["10.0.1.49:9200"]
        index => "logstash-mysql-slow-117-%{+YYYY.MM.dd}"
        user => "elastic"
        password => "linuxea"
    }
    }
    if [type] == "java-117" {
    elasticsearch {
        hosts => ["10.0.1.49:9200"]
        index => "logstash-java-117-%{+YYYY.MM.dd}"
        user => "elastic"
        password => "linuxea"
    }
    }
    stdout {codec => rubydebug}
}
[root@linuxea.com-Node49 /etc/logstash/patterns.d]# 

最后收集到日志是这样的tomcat-access-1.jpg

相关文章

对接alertmanager创建钉钉卡片(1)
手把手教你搭建OpenFalcon监控系统
无需任何魔法即可使用 Ansible 的神奇变量“hostvars”
openobseve HA本地单集群模式
基于k8s上loggie/vector/openobserve日志收集
openobseve单节点和查询语法

发布评论