Поэтому я недавно создал кластер ELK, используя этот сайт в качестве шаблона для этого. Настройка кластера ELK
Я столкнулся с проблемой, когда шаблон json на узле обработки Logstash не используется на фактических узлах данных Elasticsearch. Я вижу, что отображение было создано в HQ, но было создано другое, которое использует некоторые динамически созданные сопоставления. Правильно выполненное сопоставление называется «Sourcefire» на узлах данных, но оно также создало неправильное отображение под названием «sourcfire».
Я не могу понять этого и изучаю этот материал, поэтому любая помощь приветствуется. Ниже приведены фрагменты кода.
Logstash.conf
input {
tcp {
port => 5170
type => "sourcefire"
}
}
filter {
mutate{
split => ["message", "|"]
add_field => {
"event" => "%{message[5]}"
"eventSource" => "%{message[1]}"
}
}
kv {
include_keys => ["dhost", "dst", "dpt", "shost", "src", "spt", "rt"]
}
mutate {
rename => [ "dhost", "destinationHost" ]
rename => [ "dst", "destinationAddress" ]
rename => [ "dpt", "destinationPort" ]
rename => [ "shost", "sourceHost" ]
rename => [ "src", "sourceAddress" ]
rename => [ "spt", "sourcePort" ]
}
date {
match => ["rt","UNIX_MS"]
target => "eventDate"
}
geoip {
add_tag => [ "sourceGeo" ]
source => "src"
database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
}
geoip {
add_tag => [ "destinationGeo" ]
source => "src"
database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
}
}
output {
if [type] == "sourcefire" {
elasticsearch {
cluster => "XXX-cluster"
flush_size => 1
manage_template => true
template => "/opt/logstash/lib/logstash/outputs/elasticsearch/elasticsearch-sourcefire.json"
}
}
}
Шаблон json elasticsearch
{
"template": "logstash-*",
"settings": {
"index.refresh_interval": "5s"
},
"mappings": {
"Sourcefire": {
"_all": {
"enabled": true
},
"properties": {
"@timestamp": {
"type": "date",
"format": "basicDateTimeNoMillis"
},
"@version": {
"type": "string",
"index": "not_analyzed"
},
"geoip": {
"type": "object",
"dynamic": true,
"path": "full",
"properties": {
"location": {
"type": "geo_point"
}
}
},
"event": {
"type": "string",
"index": "not_analyzed"
},
"eventDate": {
"type": "date",
"format": "basicDateTimeNoMillis"
},
"destinationAddress": {
"type": "ip"
},
"destinationHost": {
"type": "string",
"index": "not_analyzed"
},
"destinationPort": {
"type": "integer",
"index": "not_analyzed"
},
"sourceAddress": {
"type": "ip"
},
"sourceHost": {
"type": "string",
"index": "not_analyzed"
},
"sourcePort": {
"type": "integer",
"index": "not_analyzed"
}
}
}
}
}