One common requirement for users of Elasticsearch is to have automatic alerts sent out whenever some query gets matched, or when some other condition gets satisfied. In fact, Yelp have come up with a python-based solution for this in the form of Elastalert, which at time of writing, is extremely popular with over 5.5K stars
But did you know, you can achieve a lot of the same functionality as Elastalert with Apache Nifi? We’ve previously seen how to use Apache NiFi as a very convenient way of getting data INTO Elasticsearch via syslog, or even Netflow – but you can also use it to get data OUT of Elasticsearch. If you’re already using this brilliant data flow tool to get data in, you might as well save on some resources and time by using it for your alerts. In this blog post we’ll explore two templates (link to the actual template in the appendix):
- Sending an email alert when a Lucene/Elastic query is matched (the same type of queries you use in the Kibana search bar).
- Sending an email alert when an Elasticsearch aggregation value goes over a certain amount.
Alert on query match

As the above screenshot shows, there isn’t much to it, drag and drop two processors: ScrollElasticsearchHttp and PutEmail. The most important thing to note is the choice of ScrollElasticsearchHttp over any of the other options. Scrolls are used in Elasticsearch when you need to traverse large datasets by splitting results into pages, each including a fixed amount of documents. NiFi puts each page entirely in memory, so it’s important to use scrolls when dealing with large datasets to avoid your server keeling over.
UPDATE 09/2019:
A reader pointed out that using ScrollElasticsearchHttp is not the best way to accomplish this. ScrollElasticsearchHttp is designed to only run once. After the query is completed, the processor records this in it’s local state and will not run again even if triggered to do so:

We therefore have to replace the ScrollElasticsearchHttp processor with QueryElasticsearchHttp, which thankfully has the same functionality and the same configuration. The template provided at the end of the article has been updated to reflect this. Original article continues below
Looking closer at the Elasticsearch HTTP processor we see:

Apart from entering your elasticsearch URL, a few pointers (you obviously need to modify these appropriately for your setup):
- You can enter any kibana-style query here. For better performance, write more specific queries. For example, you can also include time ranges like: @timestamp:[now-5m TO *] to query data for just the last 5 minutes
- The page size is important. Like I mentioned before, NiFi will load the entire page into memory, so make sure this is small enough to fit comfortably in server memory
- It’s also a good idea to limit the index you are querying for performance reasons. In the above example, we use NiFi expression language as follows (which builds a string similar to filebeat-2019.04.11):
${ now():format('yyyy.MM.dd'):prepend('filebeat-*) }
- The “type” is optional, but again the more specific you are, the better for query performance
- Another performance tip is to set the “sort” option to “_doc”. As the Elasticsearch docs mention:
Scroll requests have optimizations that make them faster when the sort order is
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html_doc
One last tip on this processor: modify the “scheduling” tab as follows:

Note that “Run Schedule” is set to “60 sec”. This means the query is run once every minute rather than continuously one after another. Combining this with a query including a time range like @timestamp:[now-1m TO *] will make sure this a lightweight query.
The PutEmail processor is super simple – just enter your SMTP server settings, To/From email addresses, a subject line and message…. you’re good to go!
Query on aggregation calculation
That first example was quite simple (and a good thing too!). Let’s move on to something more complex. Let’s say you have the following requirement (true story):
“We’d like to get alerted if a particular host on the network sends data (on average) over a certain threshold”
First off, if you’re already an ELK user, you’d notice the above is a very simple aggregation. Hint: one of my favorite ways for quickly building an aggregation query is using kibana: simply build a visualization like a Data Table, and then use the small arrow icon at the bottom (1) to reveal the “Requests” tab (2):

Once you have your JSON query from above, we need to filter the results so that we pick out that value we’d like to report on. The EvaluateJsonPath processor does exactly that. It allows us to use JSONPath expressions to extract a value from the results and place it in an attribute. For example, if we send Elasticsearch an aggregation query like the one shown above, we’ll get an answer similar to:
"aggregations": {
"2": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"1": {
"value": 882
},
"key": "92.251.55.112",
"doc_count": 1
}
]
}
}
we’d like the value 882, which we can filter for using the JSONPath expression
$.2.buckets[0].1.value
So, now we have an attribute in our flow file which is set to the value we got from our Elastisearch aggregation. Our last step is to check if this value went above a certain hardcoded threshold. This is easy using RouteOnAttribute. Use NiFI expression language to build an expression similar to:
${avg:gt(100)}
In the above we’re using the “gt” function to compare the value of the “avg” attribute (which we got from the previous step), and see it went over 100. If it does match, we can send an email just like before, or log to a file. Putting it all together we get a template like so:

Appendix
Full template code:
https://gist.github.com/dvas0004/3fd0e7c24c6d06f5095471e7b72b329b
Aside: to me, scenarios like this is why open source projects like the amazing ELK stack and NiFi are the future – they enable you to come up with combinations of projects which exactly fit your needs – something no commercial software will be able to do quickly and cheaply
You must be logged in to post a comment.