Files
docker-log-centralizer/logstash/indexer/pipeline/kafka_elasticsearch.conf
2017-03-17 15:35:36 +01:00

134 lines
3.1 KiB
Plaintext

input {
kafka {
codec => json{}
bootstrap_servers => "kafka:9092"
topics => ["random", "apache", "random-forwarder", "apache-forwarder"]
client_id => "logstash_indexer_1"
}
}
filter {
if [type] == "nginx-access" {
grok {
match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "nginx-geoip" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
useragent {
source => "agent"
}
}
if [type] == "random" {
grok {
match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
overwrite => [ "message" ]
}
date {
match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
remove_field => [ "timestamp" ]
}
}
if [type] == "apache" {
grok {
match => [ "message" , "%{COMBINEDAPACHELOG}"]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "apache-geoip" ]
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
}
if [type] == "random-forwarder" {
grok {
match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
overwrite => [ "message" ]
}
date {
match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
remove_field => [ "timestamp" ]
}
}
if [type] == "apache-forwarder" {
grok {
match => [ "message" , "%{COMBINEDAPACHELOG}"]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "apache-geoip" ]
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
}
}
output {
if [type] == "nginx-access" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx-%{+YYYYMM}"
}
}
if [type] == "random" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "random-%{+YYYYMM}"
}
}
if [type] == "apache" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "apache-%{+YYYYMM}"
}
}
if [type] == "random-forwarder" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "randomforwarder-%{+YYYYMM}"
}
stdout {
codec => rubydebug
}
}
if [type] == "apache-forwarder" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "apacheforwarder-%{+YYYYMM}"
}
stdout {
codec => rubydebug
}
}
}