root@pts/0 # cat mysql-log.conf
input {
beats {
port => 5044
}
}
filter {
if [fields][document_type] == "mysql-audit-log" {
json {
source => "message"
}
date {
match => [ "date", "UNIX_MS" ]
target => "timestamp"
timezone => "Asia/Shanghai"
}
# ruby {
# code => "event.set('timestamp', event.get('run_time').time.localtime + 8*60*60)"
# }
# ruby {
# code => "event.set('@timestamp', event.get('timestamp'))"
# }
mutate {
replace => { "type" => "mysql-audit-log" }
remove_field => [ "agent", "input", "ecs", "version", "date" ]
}
}
if [fields][document_type] == "mysql-slow-query" {
# 数据类型转换
grok {
match => { "message" => "%{MYSQLSLOW}" }
}
# 替换换行符和多余空格,数据类型转换和基础字段处理
mutate {
gsub => [
"message", '^\s*$', '',
"sql_query", '[\n\r\t]+', ' ',
"sql_query", '\s+', ' '
]
convert => {
"query_time" => "float"
"lock_time" => "float"
"rows_sent" => "integer"
"rows_examined" => "integer"
}
replace => { "type" => "mysql-slow-query" }
}
# 日期处理
date {
match => ["timestamp", "ISO8601"]
timezone => "Asia/Shanghai"
target => "@timestamp"
}
# 提取SQL命令类型
grok {
match => { "sql_query" => "^(?<query_type>\w+)" }
# add_tag => [ "query_type" ]
}
# 统一 query_type 为大写
mutate {
uppercase => [ "query_type" ]
}
# 提取表名和数据库名
grok {
match => {
"sql_query" => [
"(?i)(?:from|join|update|into)\s+`?%{DATA:table_name}`?\s+"
]
}
tag_on_failure => []
}
# 处理多表名情况
if [table_name] {
mutate {
split => { "table_name" => "," }
gsub => [
"table_name", "`", "",
"table_name", "\s+", ""
]
}
}
# 最终字段处理
mutate {
remove_field => [ "timestamp_unix", "ecs", "version", "agent", "@version", "timestamp" ]
strip => ["table_name", "database"]
}
}
}
output {
# stdout { codec => rubydebug }
elasticsearch {
hosts => ["172.26.32.57:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
user => "elastic"
password => "xxxxxxxx"
}
}