安装Logstash

在test11服务器上安装Logstash,如下:

[root@test11 ~]# tar zxf logstash-7.3.1.tar.gz -C /usr/local/
[root@test11 ~]# ln -s /usr/local/logstash-7.3.1/ /usr/local/logstash
[root@test11 ~]# cd /usr/local/logstash/
[root@test11 ~]# mkdir conf.d
配置logstash

1、配置logstash.yml文件,开启参数如下:

[root@test11 logstash]# cat config/logstash.yml |grep -vE "^#|^$"
#path.data: /var/lib/logstash                   #可以修改logstash的数据存储位置,默认是在LOGSTASH_HOME/data下
pipeline.batch.size: 5000
path.config: /usr/local/logstash/conf.d/*.conf
xpack.monitoring.elasticsearch.hosts: ["http://192.168.37.11:9200"]
slowlog.threshold.warn: 2s
slowlog.threshold.info: 1s

2、根据实际需要调整一下jvm的内存参数

[root@test11 logstash]# vim config/jvm.options
-Xms4g
-Xmx4g

3、配置一下nginx的access的logstash配置文件,如下:

[root@test11 ~]# cat /usr/local/logstash/conf.d/nginx_access.conf 
input {
    kafka {
        bootstrap_servers => "192.168.37.11:9092,192.168.37.12:9092,192.168.37.13:9092"
        topics => ["nginx_access_log"]
        codec => "json"
    }
}
filter {
#    mutate { add_field => { "[@metadata][debug]"=>true } }          #添加一个字段用来方便调试
    mutate {
        rename => { "[log][file][path]" => "source" }
    }
    if [fields][log_topic] == "nginx_access_log" {
        grok {
            match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:uagent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-) %{WORD:scheme}"]
        }

        if [request] {
            urldecode {
                field => "request"
            }
            ruby {
                init => "@kname = ['url_path','url_arg']"
                code => "
                    new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
                    event.append(new_event)"
            }
            if [url_arg] {
                ruby {
                    init => "@kname = ['key', 'value']"
                    code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
                }
            }
        }
        date {
            match => ["raw_datetime", "dd/MMM/YYYY:HH:mm:ss Z"]
            locale => "en"
        }

        useragent {
            source => "uagent"
            target => "ua"
        }
        if [source] !~ "cdn" and [clientip] !~ "10.10|192.168" {
            geoip {
                source => "clientip"
                database => "/usr/local/logstash/conf.d/GeoLite2-City/GeoLite2-City.mmdb"
#GeoLite2-City下载地址:https://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
            }
        }
        mutate {
            remove_field => [ "message", "raw_datetime", "uagent", "request", "url_arg" , "[fields][document_type]" ,"log", "@version", "ecs", "input", "agent"]
        }
    }
}

output {
    if [fields][log_topic] == "nginx_access_log" {
        if [@metadata][debug]{
                stdout{codec=>rubydebug{metadata=>true}}
        }
        else{
        stdout{codec=>dots}
        }
        elasticsearch {
            hosts => ["192.168.37.11:9200"]
            index => "nginx-access-log-%{+YYYY.MM.dd}"
        }
    }
}

至此:ELK的环境已经部暑成功,可以看有没有数据写入到kibana上了!至于上面配置的nginx日志的logstash配置文件的一些具体语法,请参考Logstash详解

文档更新时间: 2019-09-09 15:44   作者:子木