通过Logstash安装中使用过的实例来讲解一下logstash的使用语法
[root@test11 ~]# cat /usr/local/logstash/conf.d/nginx_access.conf
input {
kafka { #读取kafka中的topic数据
bootstrap_servers => "192.168.37.11:9092,192.168.37.12:9092,192.168.37.13:9092"
topics => ["nginx_access_log"]
codec => "json"
}
}
filter {
# mutate { add_field => { "[@metadata][debug]"=>true } } #添加一个字段用来方便调试
mutate {
rename => { "[log][file][path]" => "source" }
}
if [fields][log_topic] == "nginx_access_log" {
grok {
match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:uagent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-) %{WORD:scheme}"]
}
if [request] {
urldecode {
field => "request"
}
ruby {
init => "@kname = ['url_path','url_arg']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
event.append(new_event)"
}
if [url_arg] {
ruby {
init => "@kname = ['key', 'value']"
code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
}
}
}
date {
match => ["raw_datetime", "dd/MMM/YYYY:HH:mm:ss Z"]
locale => "en"
}
useragent {
source => "uagent"
target => "ua"
}
if [source] !~ "cdn" and [clientip] !~ "10.10|192.168" {
geoip {
source => "clientip"
# database => "/ubox/logstash/GeoLiteCity.dat"
}
}
mutate {
remove_field => [ "message", "raw_datetime", "uagent", "request", "url_arg" , "[fields][document_type]" ,"log", "@version", "ecs", "input", "agent"]
}
}
}
output {
if [fields][log_topic] == "nginx_access_log" {
if [@metadata][debug]{
stdout{codec=>rubydebug{metadata=>true}}
}
else{
stdout{codec=>dots}
}
elasticsearch {
hosts => ["192.168.37.11:9200"]
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
}
}
Codec Plugin
作用于input和output plugin,负责将数据在原始与Logstash Event之间转换,常见的codec有:
- plain读取原始内容
- dots将内容简化为点进行输出
- rubydebug将Logstash Events按照ruby格式输出,方便调试
- line处理带有换行符的内容
- json处理json格式的内容
- multiline处理多行数据的内容
Filter Plugin
Filter是Logstash功能强大的主要原因,它可以对Logstash Event进行丰富的处理,比如解析数据,删除字段,类型转换等,常见的有如下几个:
- dissect分割解析
- grok正则匹配解析
- mutate对字段作处理,比如重命名、删除、替换等
- date日期解析
- json按照json解析字段内容到指定字段中
- geoip增加地理位置数据
- ruby利用ruby代码来动态修改Logstash
- useragent,添加有关用户代理(如系列,操作系统,版本和设备)的信息
dissect分割解析:
Dissect过滤器是一种拆分操作。与常规拆分操作(其中一个分隔符应用于整个字符串)不同,此操作将一组分隔符应用于字符串值。Dissect不使用正则表达式,速度非常快。但是,如果文本结构因行而异,则Grok更适合。有一种混合情况,其中Dissect可用于解除可靠重复的线段,然后Grok可用于剩余的场值,具有更多的正则表达式可预测性和更少的整体工作。一个分隔符是一个之间的文本}和下一个%{字符。
实例:
John Smith,Big Oaks,Wood Lane,Hambledown,Canterbury,CB34RY
以 ,号为分割符:
%{name},%{addr1},%{addr2},%{addr3},%{city},%{zip}
再参照我们上面logstash的配置文件里的:
"message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:uagent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-) %{WORD:scheme}
上面分割的是nginx的access文件日志格式,下面以access日志中的某个记录对比以上的分割格式:
216.10.245.81 - - [28/Aug/2019:10:32:10 +0800] "GET / HTTP/1.1" 403 146 "-" "Mozilla/5.0 (Windows; U; Windows NT 6.0;en-US; rv:1.9.2) Gecko/20100115 Firefox/3.6)" - - 0.000 - http
47.89.192.12 - - [28/Aug/2019:10:44:04 +0800] "GET / HTTP/1.1" 403 146 "-" "Mozilla/5.0 zgrab/0.x" - - 0.000 - http
可见字段间是以 空格符 为分割的,当然以上的日志文件内容较为复杂,所以需要配合正则表达式来使用。
例如:{IP:clientip} IP这个是logstash定义好的正则,可以抓取到IP地址。里面的clientip(自定义的)即是把抓取到的数据存储到我们这个字段上。
当然:像上面的”Mozilla/5.0 (Windows; U; Windows NT 6.0;en-US; rv:1.9.2) Gecko/20100115 Firefox/3.6)”这样的字段实在太复杂了,我们就可以用DATA来匹配,DATA在logstash定义的正则格式是:DATA .*?,意思是匹配所有
说到这里,我们当然想知道logstash中其它定义的正则规则是怎样的,别急,我们可以参考下面的github地址:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
grok正则匹配解析
grok语法如下:
- %{SYNTAX:SEMANTIC} SYNTAX为grok pattern的名称,SEMANTIC为赋值字段名称
- %{NUMBER:duration} 例%{IP:client} 可以匹配数值类型,但是grok匹配出来的内容都是字符串类型,可以通过在最后指定为int或者float来强制转换类型。%{NUMBER:duration:float}
注:上面的dissect分割解析,其它nginx的access日志解析那个例子已经是配合上了grok正则匹配解析。
当然,如果要验证自己写的grok语法有没有问题的,可以到这个的网址去验证:http://grokdebug.herokuapp.com/
mutate解析
使用最频繁的操作,可以对字段进行各种操作,比如重命名、删除、替换、更新等,主要操作如下:
- convert类型转换
- gsub字符串替换
- split/join/merge字符串切割、数组合并为字符串、数组合并为数组
- rename字段重命名
- update/replace字段内容更新或替换
- remove_field删除字段
注:实现字段类型的转换,类型为hash,仅支持转换为integer、float、string和boolean。例:
filter{
mutate{
convert => { "age" => "integer" }
}
mutate {
rename => { "[log][file][path]" => "source" }
}
mutate {
remove_field => [ "message", "raw_datetime", "uagent", "request", "url_arg" , "[fields][document_type]" ,"log", "@version", "ecs", "input", "agent"]
}
}
useragent解析
UserAgent过滤器,添加有关用户代理(如系列,操作系统,版本和设备)的信息
useragent {
source => "uagent" #要解析的字段名
target => "ua" #解析后的存储字段,默认和uagent同级别
}
ruby利用ruby代码来动态修改Logstash
此过滤器接受内联ruby代码或ruby文件。这两个选项是互斥的。最灵活的插件,可以用ruby语言随心所欲的修改Logstash Event对象。
以上例来讲解:
if [request] {
urldecode { #用于解码被编码的字段,可以解决URL中 中文乱码的问题
field => "request" #指定request字段
}
ruby {
init => "@kname = ['url_path','url_arg']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
event.append(new_event)"
}
if [url_arg] {
ruby {
init => "@kname = ['key', 'value']"
code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
}
}
}
上面解析的是url,我们来举一个url的网址:
http://192.168.37.11:5601/app/kibana#/discover?_g=()&_a=(columns:!(_source),index:'9937d130-ca4f-11e9-9775-5b954fbfd30b',interval:auto,query:(language:kuery,query:''),sort:!('@timestamp',desc))
init:通过 “init” 参数预定义好由每个新字段的名字组成的数组
code:写入自己的处理逻辑,例如上面的”code” 参数指定的 Ruby 语句里通过两个数组的 zip 操作生成一个哈希并添加进数组里
split:使用分隔符把字符串分割成数组
output解析
将事件数据发送到特定目标。支持输出的方式有:https://www.elastic.co/guide/en/logstash/7.3/output-plugins.html
常见的除了上面的将输出传向ES,还可以将输出指定的路径下,如下:
output {
if [fields][log_topic] in ["nginx_access_log", "nginx_error_log"] {
file {
codec => line { format => "%{message}"}
#默认情况下,此输出以json格式每行写入一个事件。您可以使用line编解码器自定义行格式
# codec => line { format => “custom format:%{message}” } } }
path => "/app/logstash/%{[fields][document_type]}/%{source}.%{local_time}"
}
}
}