Nugget Post: Reactive Functions to parse nested objects

Note this article assumes familiarity with the Observer Pattern / Reactive Programming as described here:

Some APIs return complex nested JSON objects. For example, take this cleaned up sample response from ElasticSearch (which incidentally is used to build the “Data Table” visualization):

Note the structure of the object. Within the top level “aggregations” object we see a recursive nested structure; each nested object has a “buckets” object, which contains an array of objects, and each object also contains a “key”. The question now is, how do we efficiently traverse the above object to extract each “key” value while retaining the parent’s object “key” as well? To further illustrate, taking a subset of the example above:



It was actually easier for me to reason about the above using imperative style programming, which would look something like this:

However, the idea is to use ReactiveX programming to traverse the tree in order to make the code more concise. At each key, the program should “pass down” the key to it’s child observables right down to the final child, which would then emit the result to a subscriber. This is what we end up with (in RxPY):


Let’s step through the code:

  • Lines 3-4: If you notice, each object (which I call “aggregation” in the code) contains an object called “buckets” which is an array. We can create observables from arrays, so this function simply grabs the “buckets” array of an arbitrary aggregation and returns an observable
  • Line 6-7: First time we call the getAggregation function to return an observable. Now we have an observable emitting the outer objects. We need access to the next inner object, which itself contains another “buckets” array that we can turn into a “child observable”. Therefore each object (which I call “transaction” in the code) is passed into the getAggregation function once again. However, we would like to pass on the parent’s “key” value to every emission from these child observables. That is the role of the map function which pre-pends the key to the actual emission.
  • At this point we have an observable of observables – which we need to flatten in order to pass it to subsequent stages – that’s the role of the flat_map stage.
  • Lines 8-9 are repetitions of the same pattern described above – note how at each stage we add the key to the emission of the child observable and flatten the observables into a single stream for the next stage
  • Line 10: we call our final “map” to transform the results into tuples as shown in our diagram above
  • Line 11: generic subscriber function

It’s a good exercise to:

  • really understand the difference between flat_map and map
  • understand how to pass variables to child observables via the use of a “map nested in flat_map” pattern
  • did it really make the code more concise? Do you still find it easier to reason in terms of imperative?

How we built the CyberSift Attack Map

Recently we launched a small site called the “CyberSift Attack Map” hosted at http://attack-map.cybersift.ioAny one involved in the InfoSec industry will be instantly familiar with the site:


Screenshot from 2017-10-12 12-36-06

It’s basically a map of attacks which either trip some rule in a signature based IPS such as SNORT, or land in a honeypot. In this article we’ll list some of the libraries and techniques we used to build the site for any devs out there who are interested.


We used the python Flask microframework, work in unison with Gevents. This was a relatively straightforward operation, with the major point of note being our inclusion of support for HTML5 server side events [SSE]. SSE are WebSocket’s smaller brother. Whereas websockets allow for bi-directional communication between browser and server, SSE only allow for unidirectional communication from server to browser. This suited our needs perfectly since we don’t really need the browser to get back to our server, but we do need real-time updates to flow from our server back down to the browser to populate the attack details, gauges and so on. We based our SSE Flask implementation off the extremely useful snippet from Oskar Blom you can find here:

If you are going to go down this road, make sure you add some sort of “heartbeat” which instructs Flask to remove inactive clients from the subscriptions. As soon as our honeypot or twitter microservice got wind of a new attack, it’s pushed  as a message via SSE and updates the client browser in realtime


It’s amazing to see just how quickly you can build a useful site because of the incredible hard work of other talented devs… In this case we used the following libraries to build the site:

If the site really catches on, we would need to consider re-writing the frontend with something like ReactJS to improve rendering performance