Filtering out nil values in Logstash
PATCHing data in ElasticSearch with Logstash.
Let’s imaging you are trying to aggregate certain data about entity in ElasticSearch (ES further). The naive way to do it is to use Elasticsearch output with update
action. But suddenly you realize that entities got overridden after each requests resulting with null
values in ES. So you lose your data!
Let’s assume, we would like to get the following entity after collecting and patching all pieces of data:
{
"merchant": {
"name": "TV shop",
"url": "https://tv-shop.example.com"
},
"user": {
"first_name": "Joe",
"last_name": "Philips"
},
"order": {
"description": "New TV",
"amount": 150,
"currency": "USD"
},
"transaction": {
"status": "success",
"completed_at": "2019-02-17T17:46:11.557"
}
}
If you have micro service infrastructure, it is likely that you don’t have all your data at once at some moment because you are operating on domains (merchants, order, transaction, etc.).
In our company we are using Kafka to collect every kind of data for further visualization, statistics and investigation.
Okay, talk is cheap, show me the code!
Naive attempt (wrong!):
input {
# ...
}
output {
elasticsearch {
index => "transactions"
document_id => "%{id}"
action => update
hosts => "es"
}
}
This will produce the same behavior as described in the beginning of topic.
Our next thought is to use filters. Unfortunately, Logstash has not built-in to remove nil
values from events.
Solution:
There is awesome Logstash filter called Ruby filter. We can programmatically remove all nil
values by the following Ruby code:
def remove_nils!(hash)
hash.each do |k, v|
if v.nil?
hash.delete(k)
elsif v.is_a?(Hash)
if v.empty?
hash.delete(k)
else
result = remove_nils!(v)
if result.empty?
hash.delete(k)
else
hash[k] = result
end
end
end
end
end
event_hash = event.to_hash
event.cancel
new_event_block.call(LogStash::Event.new(remove_nils!(event_hash)))
What it does is recursively traverse Event
object and removes all nil
values so that in the output
section we have only existing key-value pairs.
Here is a complete Logstash pipeline to achieve this:
input {
kafka {
bootstrap_servers => "kafka1,kafka2"
group_id => "logstash-transaction"
topics => "transactions"
consumer_threads => "2"
}
}
filter {
ruby {
code => "
# Recursively remove `nil`s and empty objects from event
def remove_nils!(hash)
hash.each do |k, v|
if v.nil?
hash.delete(k)
elsif v.is_a?(Hash)
if v.empty?
hash.delete(k)
else
result = remove_nils!(v)
if result.empty?
hash.delete(k)
else
hash[k] = result
end
end
end
end
end event_hash = event.to_hash
event.cancel
# This one could be improved, stay tuned!
new_event_block.call(
LogStash::Event.new(
remove_nils!(event_hash)
)
)
"
}
}
output {
elasticsearch {
index => "transactions"
document_id => "%{id}"
doc_as_upsert => true
action => "update"
retry_on_conflict => "3"
hosts => "es"
}
}
I am about to submit this code as a new filter for Logstash for easier reuse, so stay tuned!
P.S. This is my first article on Medium, so if you have some suggestions or note I am open to discussion in comments.
Thanks for reading.