通过Logstash安装中使用过的实例来讲解一下logstash的使用语法
[root@test11 ~]# cat /usr/local/logstash/conf.d/nginx_access.conf 
input {
    kafka {         #读取kafka中的topic数据
        bootstrap_servers => "192.168.37.11:9092,192.168.37.12:9092,192.168.37.13:9092"
        topics => ["nginx_access_log"]
        codec => "json"
    }
}
filter {
#    mutate { add_field => { "[@metadata][debug]"=>true } }          #添加一个字段用来方便调试
    mutate {
        rename => { "[log][file][path]" => "source" }
    }
    if [fields][log_topic] == "nginx_access_log" {
        grok {
            match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:uagent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-) %{WORD:scheme}"]
        }

        if [request] {
            urldecode {
                field => "request"
            }
            ruby {
                init => "@kname = ['url_path','url_arg']"
                code => "
                    new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
                    event.append(new_event)"
            }
            if [url_arg] {
                ruby {
                    init => "@kname = ['key', 'value']"
                    code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
                }
            }
        }
        date {
            match => ["raw_datetime", "dd/MMM/YYYY:HH:mm:ss Z"]
            locale => "en"
        }

        useragent {
            source => "uagent"
            target => "ua"
        }
        if [source] !~ "cdn" and [clientip] !~ "10.10|192.168" {
            geoip {
                source => "clientip"
#               database => "/ubox/logstash/GeoLiteCity.dat"
            }
        }
        mutate {
            remove_field => [ "message", "raw_datetime", "uagent", "request", "url_arg" , "[fields][document_type]" ,"log", "@version", "ecs", "input", "agent"]
        }
    }
}

output {
    if [fields][log_topic] == "nginx_access_log" {
        if [@metadata][debug]{
                stdout{codec=>rubydebug{metadata=>true}}
        }
        else{
        stdout{codec=>dots}
        }
        elasticsearch {
            hosts => ["192.168.37.11:9200"]
            index => "nginx-access-log-%{+YYYY.MM.dd}"
        }
    }
}
Codec Plugin

作用于input和output plugin,负责将数据在原始与Logstash Event之间转换,常见的codec有:

  • plain读取原始内容
  • dots将内容简化为点进行输出
  • rubydebug将Logstash Events按照ruby格式输出,方便调试
  • line处理带有换行符的内容
  • json处理json格式的内容
  • multiline处理多行数据的内容
Filter Plugin

Filter是Logstash功能强大的主要原因,它可以对Logstash Event进行丰富的处理,比如解析数据,删除字段,类型转换等,常见的有如下几个:

  • dissect分割解析
  • grok正则匹配解析
  • mutate对字段作处理,比如重命名、删除、替换等
  • date日期解析
  • json按照json解析字段内容到指定字段中
  • geoip增加地理位置数据
  • ruby利用ruby代码来动态修改Logstash
  • useragent,添加有关用户代理(如系列,操作系统,版本和设备)的信息
dissect分割解析:

Dissect过滤器是一种拆分操作。与常规拆分操作(其中一个分隔符应用于整个字符串)不同,此操​​作将一组分隔符应用于字符串值。Dissect不使用正则表达式,速度非常快。但是,如果文本结构因行而异,则Grok更适合。有一种混合情况,其中Dissect可用于解除可靠重复的线段,然后Grok可用于剩余的场值,具有更多的正则表达式可预测性和更少的整体工作。一个分隔符是一个之间的文本}和下一个%{字符。
实例:

John Smith,Big Oaks,Wood Lane,Hambledown,Canterbury,CB34RY

以 ,号为分割符:

%{name},%{addr1},%{addr2},%{addr3},%{city},%{zip}

再参照我们上面logstash的配置文件里的:

"message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:uagent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-) %{WORD:scheme}

上面分割的是nginx的access文件日志格式,下面以access日志中的某个记录对比以上的分割格式:

216.10.245.81 - - [28/Aug/2019:10:32:10 +0800] "GET / HTTP/1.1" 403 146 "-" "Mozilla/5.0 (Windows; U; Windows NT 6.0;en-US; rv:1.9.2) Gecko/20100115 Firefox/3.6)" - - 0.000 - http
47.89.192.12 - - [28/Aug/2019:10:44:04 +0800] "GET / HTTP/1.1" 403 146 "-" "Mozilla/5.0 zgrab/0.x" - - 0.000 - http

可见字段间是以 空格符 为分割的,当然以上的日志文件内容较为复杂,所以需要配合正则表达式来使用。

例如:{IP:clientip} IP这个是logstash定义好的正则,可以抓取到IP地址。里面的clientip(自定义的)即是把抓取到的数据存储到我们这个字段上。

当然:像上面的”Mozilla/5.0 (Windows; U; Windows NT 6.0;en-US; rv:1.9.2) Gecko/20100115 Firefox/3.6)”这样的字段实在太复杂了,我们就可以用DATA来匹配,DATA在logstash定义的正则格式是:DATA .*?,意思是匹配所有

说到这里,我们当然想知道logstash中其它定义的正则规则是怎样的,别急,我们可以参考下面的github地址:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

grok正则匹配解析

grok语法如下:

  • %{SYNTAX:SEMANTIC} SYNTAX为grok pattern的名称,SEMANTIC为赋值字段名称
  • %{NUMBER:duration} 例%{IP:client} 可以匹配数值类型,但是grok匹配出来的内容都是字符串类型,可以通过在最后指定为int或者float来强制转换类型。%{NUMBER:duration:float}

注:上面的dissect分割解析,其它nginx的access日志解析那个例子已经是配合上了grok正则匹配解析。

当然,如果要验证自己写的grok语法有没有问题的,可以到这个的网址去验证:http://grokdebug.herokuapp.com/

mutate解析

使用最频繁的操作,可以对字段进行各种操作,比如重命名、删除、替换、更新等,主要操作如下:

  • convert类型转换
  • gsub字符串替换
  • split/join/merge字符串切割、数组合并为字符串、数组合并为数组
  • rename字段重命名
  • update/replace字段内容更新或替换
  • remove_field删除字段

注:实现字段类型的转换,类型为hash,仅支持转换为integer、float、string和boolean。例:

filter{
  mutate{
    convert => { "age" => "integer" }
  }
  mutate {
    rename => { "[log][file][path]" => "source" }
    }
  mutate {
    remove_field => [ "message", "raw_datetime", "uagent", "request", "url_arg" , "[fields][document_type]" ,"log", "@version", "ecs", "input", "agent"]
    }
}
useragent解析

UserAgent过滤器,添加有关用户代理(如系列,操作系统,版本和设备)的信息

useragent {
            source => "uagent"        #要解析的字段名
            target => "ua"            #解析后的存储字段,默认和uagent同级别
        }
ruby利用ruby代码来动态修改Logstash

此过滤器接受内联ruby代码或ruby文件。这两个选项是互斥的。最灵活的插件,可以用ruby语言随心所欲的修改Logstash Event对象。
以上例来讲解:

 if [request] {
            urldecode {              #用于解码被编码的字段,可以解决URL中 中文乱码的问题
                field => "request"   #指定request字段
            }
            ruby {
                init => "@kname = ['url_path','url_arg']"
                code => "
                    new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
                    event.append(new_event)"
            }
            if [url_arg] {
                ruby {
                    init => "@kname = ['key', 'value']"
                    code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
                }
            }
        }

上面解析的是url,我们来举一个url的网址:

http://192.168.37.11:5601/app/kibana#/discover?_g=()&_a=(columns:!(_source),index:'9937d130-ca4f-11e9-9775-5b954fbfd30b',interval:auto,query:(language:kuery,query:''),sort:!('@timestamp',desc))

init:通过 “init” 参数预定义好由每个新字段的名字组成的数组
code:写入自己的处理逻辑,例如上面的”code” 参数指定的 Ruby 语句里通过两个数组的 zip 操作生成一个哈希并添加进数组里
split:使用分隔符把字符串分割成数组

output解析

将事件数据发送到特定目标。支持输出的方式有:https://www.elastic.co/guide/en/logstash/7.3/output-plugins.html
常见的除了上面的将输出传向ES,还可以将输出指定的路径下,如下:

output {
    if [fields][log_topic] in ["nginx_access_log", "nginx_error_log"] {
        file {
            codec => line { format => "%{message}"} 
#默认情况下,此输出以json格式每行写入一个事件。您可以使用line编解码器自定义行格式
#           codec => line { format => “custom format:%{message}” } } }  
            path => "/app/logstash/%{[fields][document_type]}/%{source}.%{local_time}"
        }
    }
}
文档更新时间: 2019-09-02 16:24   作者:子木