Logstash is a data processing pipeline that ingests, transforms, and ships log data to various destinations. In our ELK stack, it serves as the central log processor, collecting logs from different services via GELF, parsing them, and sending them to Elasticsearch for storage and analysis.
About GELF see gelf.md
Our Logstash service is configured in Docker Compose as follows:
logstash:
<<: *common
container_name: logstash
build: ./src/elk/logstash
profiles: ["elkprofile"]
restart: always
command: logstash -f /logstash_dir/logstash.conf
ports:
- "5000:5000/udp"
- "9600:9600"
- "12201:12201/udp"
environment:
- ES_JAVA_OPTS=-Xmx1g -Xms1g
- xpack.monitoring.enabled=true
- xpack.monitoring.elasticsearch.hosts=http://elasticsearch:9200
ES_JAVA_OPTS
: Limits JVM heap size to 1GBOur custom pipeline configuration (logstash.conf
) handles different log formats from various services:
input {
gelf {
host => "0.0.0.0"
port => 12201
type => "docker"
}
}
filter {
mutate {
rename => { "host" => "hostname" }
gsub => [
"message", "\r", "",
"message", "\n", ""
]
}
# Parse backend logs (tag: "backend")
if [tag] == "backend" {
grok {
match => {
"message" => "%{IPORHOST:client_ip}:%{POSINT:client_port} \- \- \[%{MONTHDAY:day}/%{MONTH:month}/%{YEAR:year}:%{TIME:time}\] \"%{WORD:http_method} %{URIPATHPARAM:request_path}.*\" %{NUMBER:http_status} %{NUMBER:response_size}"
}
remove_tag => ["_grokparsefailure"]
add_tag => ["http_request"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss"]
target => "@timestamp"
remove_field => ["timestamp"]
}
}
# Parse game logs (tag: "game")
if [tag] == "game" {
grok {
match => {
"message" => "%{GREEDYDATA:log_message}"
}
remove_tag => ["_grokparsefailure"]
}
}
# Parse redis logs (tag: "redis")
if [tag] == "redis" {
grok {
match => { "message" => "%{NONNEGINT:pid}:%{WORD:log_level}\s+%{MONTHDAY:day} %{MONTH:month} %{YEAR:year} %{TIME:time} (?<redis_priority>[*#\.-]) %{GREEDYDATA:redis_message}" }
remove_tag => ["_grokparsefailure"]
add_tag => ["redis_parsed"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "docker-logs-%{+yyyy.MM.dd}"
}
stdout { codec => rubydebug }
}
docker compose ps logstash
docker compose logs logstash
curl http://localhost:9600/?pretty
This should return information about the Logstash instance.
Test if the GELF server is reachable:
echo '{"version":"1.1","host":"test","short_message":"test message"}' | nc -u -w1 localhost 12201
Check if the port is open:
sudo lsof -iUDP -nP | grep 12201
curl -X GET "http://localhost:9200/docker-logs-*/_search?pretty&size=5"
http://localhost:5601
and:
docker-logs-*
If you need to debug or test grok patterns, use the online Grok Debugger at https://grokdebugger.com/
Example pattern:
%{IPORHOST:client_ip}:%{POSINT:client_port} - - %{MONTHDAY:d}/%{MONTH:m}/%{YEAR:year}:%{TIME:time}\] \"%{WORD:http_method} %{URIPATHPARAM:request_path}.*\" %{NUMBER:http_status} %{NUMBER:response_size}
Will match a log line like: