跳转至

Logstash线上配置

logstash 主要依据filebeat 传入的日志做过滤,转义,格式化等,处理过后才将数据交给ES,这里列出我们现在用到的nginx日志处理,mysql json格式的审计日志拆分,还有系统操作日志的过滤

默认 Grok 过滤规则通常在程序目录中

程序目录:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/

Bash
1
2
3
4
5
6
root@pts/0 # ls /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/
aws     bind  exim       grok-patterns  httpd  junos         maven        mcollective-patterns  nagios      rails  ruby
bacula  bro   firewalls  haproxy        java   linux-syslog  mcollective  mongodb               postgresql  redis  squid
# 进入 patterns 目录,查看文件内容
cat grok-patterns

自定义 patterns

推荐位置,可以在/usr/share/logstash目录下创建patterns目录,按照默认格式,我这里创建了三个自定义规则,注意目录及文件权限

Bash
1
2
3
root@pts/0 # mkdir  /usr/share/logstash/patterns/
root@pts/0 # ls /usr/share/logstash/patterns/
mysqlslow  nginx  syslog

nginx日志分割

Bash
1
2
3
root@pts/0 # cat  /usr/share/logstash/patterns/nginx 
NGACCESS %{IPORHOST:remote_add} %{NOTSPACE:remote_user} \[%{HTTPDATE:timestamp}\] \"%{WORD:method} (?:%{URIPROTO:uri_proto}://%{IPORHOST:uri_domain}%{URIPATHPARAM:uri}|%{URIPATHPARAM:uri}) HTTP/%{NUMBER:version}\" %{NUMBER:status} %{NUMBER:bytes} %{QS:referer} %{QS:user_agent} \"%{NUMBER:client_port}\" %{QS:http_x_forward_for} %{QS:server_name} %{QS:rhost} %{QS:upsteam_add} \"(?:%{NUMBER:response_time}|-)\" \"%{NUMBER:request_time}\"
NGWAF %{IPV4:realip}\s+\[%{TIMESTAMP_ISO8601:time}\]\s+"%{WORD:method}\s+(?<url>[^"]+)"\s+"(?<data>[^"]+)"\s+"(?<useragent>[^"]+)"\s+"(?<server_name>[^"]+)"\s+"(?<rhost>[^"]+)"\s+"(?<ruletag>[^"]+)"

系统日志分割syslog

Bash
1
2
3
root@pts/0 # cat  /usr/share/logstash/patterns/syslog 
TTY [a-z]{3}/[0-9]{,2}
SYSTEMSYSLOG %{TIMESTAMP_ISO8601:timestamp} %{SYSLOGHOST:syslog_hostname} %{USER:user}:[ ]? COMMAND: RUN_TIME=%{TIMESTAMP_ISO8601:run_time} RUN_USER=%{USER:run_user} LOGIN_USER=%{USER:login_user} LOGIN_TTY=%{TTY:terminal} LOGIN_TIME=%{TIMESTAMP_ISO8601:login_time} LOGIN_IP=%{IP:login_ip}(:S\.0)? CMD=%{GREEDYDATA:command}

慢查日志分割mysqlslow

Bash
root@pts/0 # cat  /usr/share/logstash/patterns/mysqlslow
MYSQLSLOW # Time: %{TIMESTAMP_ISO8601:timestamp}\n# User@Host: %{USER:user}\[[^\]]+\] @\s+\[%{IP:client_ip}\]\s+Id: %{NUMBER:connection_id}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\n(?:use %{DATA:database};\n)?SET timestamp=%{NUMBER:timestamp_unix:int};\n%{GREEDYDATA:sql_query}

pipeline配置

参考logstash多管道配置

Bash
root@pts/0 # ls /etc/logstash/config
mysql-log.conf  nginx-log.conf  system-log.conf

mysql-log.conf

Bash
root@pts/0 # cat mysql-log.conf 
input {
  beats {
    port => 5044
  }
}

filter {
  if [fields][document_type] == "mysql-audit-log" {
    json {
      source => "message"
    }
    date {
      match => [ "date", "UNIX_MS" ]
      target => "timestamp"
      timezone => "Asia/Shanghai"
    }
#    ruby {
#      code => "event.set('timestamp', event.get('run_time').time.localtime + 8*60*60)"
#    }
#    ruby {
#      code => "event.set('@timestamp', event.get('timestamp'))"
#    }
    mutate {
      replace => { "type" => "mysql-audit-log" }
      remove_field => [ "agent", "input", "ecs", "version", "date" ]
    }
  }
  if [fields][document_type] == "mysql-slow-query" {
    # 数据类型转换
    grok {
      match => { "message" => "%{MYSQLSLOW}" }
    }

    # 替换换行符和多余空格,数据类型转换和基础字段处理
    mutate {
      gsub => [
        "message", '^\s*$', '',
        "sql_query", '[\n\r\t]+', ' ',
        "sql_query", '\s+', ' '
      ]
      convert => {
        "query_time" => "float"
        "lock_time" => "float"
        "rows_sent" => "integer"
        "rows_examined" => "integer"
      }
      replace => { "type" => "mysql-slow-query" }
    }
    # 日期处理
    date {
      match => ["timestamp", "ISO8601"]
      timezone => "Asia/Shanghai"
      target => "@timestamp"
    }
    # 提取SQL命令类型
    grok {
      match => { "sql_query" => "^(?<query_type>\w+)" }
#      add_tag => [ "query_type" ]
    }
    # 统一 query_type 为大写
    mutate {
      uppercase => [ "query_type" ]
    }
    # 提取表名和数据库名
    grok {
      match => { 
        "sql_query" => [
          "(?i)(?:from|join|update|into)\s+`?%{DATA:table_name}`?\s+"
        ] 
      }
      tag_on_failure => []
    }
    # 处理多表名情况
    if [table_name] {
      mutate {
        split => { "table_name" => "," }
        gsub => [
          "table_name", "`", "",
          "table_name", "\s+", ""
        ]
      }
    }
    # 最终字段处理
    mutate {
      remove_field => [ "timestamp_unix", "ecs", "version", "agent", "@version", "timestamp" ]
      strip => ["table_name", "database"]
    }
  }
}

output {
#    stdout { codec => rubydebug }
    elasticsearch {
      hosts => ["172.26.32.57:9200"]
      index => "%{type}-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "xxxxxxxx"
    }
}

nginx-log.conf

Bash
root@pts/0 # cat nginx-log.conf 
input {
  beats {
    port => 5045
  }
}

filter {
  if [fields][log_type] == "access" {
    grok {
      match => { "message" => "%{NGACCESS}" }
#      remove_field => ["referer" , "client_port", "input.type", "type", "log.offset", "cookie", "remote_user", "http_x_forward_for", "ecs", "agent.type", "agent.ephemeral_id", "agent.id", "agent.version"]
    }
  }

  if [fields][log_type] == "waf" {
    grok {
      match => { "message" => "%{NGWAF}" }
    }
  }

  if [fields][document_type] == "nginx27-log" {
    mutate { replace => { "type" => "nginx27-log" } }  
  }
  if [fields][document_type] == "nginx27waf-log" {
    mutate { replace => { "type" => "nginx27waf-log" } }
  }
  if [fields][document_type] == "nginx34-log" {
    mutate { replace => { "type" => "nginx34-log" } }
  }
  if [fields][document_type] == "nginx34waf-log" {
    mutate { replace => { "type" => "nginx34waf-log" } }
  }
  if [fields][document_type] == "tianzhiyuan-log" {
    mutate { replace => { "type" => "tianzhiyuan-log" } }
  }

  mutate {
    convert => { "request_time" => "float" }
    convert => { "response_time" => "float" }
  } 


}

output {
#    stdout { codec => rubydebug }
    elasticsearch {
      hosts => ["172.26.32.57:9200"]
      index => "%{type}-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "xxxxxxxx"
    }
}

system-log.conf

Bash
root@pts/0 # cat system-log.conf 
input {
  beats {
    port => 5046
  }
}
filter {
  if [fields][document_type] == "system-message-log" {
    mutate { replace => { "type" => "system-message-log" } }
    grok {
      match => { "message" => "%{SYSTEMSYSLOG}" }
      remove_field => [ "log.offset", "tags", "input", "@version", "ecs", "version", "version", "offset", "agent"]
    }
  }
}
output {
#    stdout { codec => rubydebug }
    elasticsearch {
      hosts => ["172.26.32.57:9200"]
      index => "%{type}-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "xxxxxxxx"
    }
}

配置后启动服务即可

Bash
1
2
3
systemctl enable logstash
systemctl start logstash
systemctl status logstash