Nugget Post: Reactive Functions to parse nested objects

Note this article assumes familiarity with the Observer Pattern / Reactive Programming as described here: http://reactivex.io/

Some APIs return complex nested JSON objects. For example, take this cleaned up sample response from ElasticSearch (which incidentally is used to build the “Data Table” visualization):

Note the structure of the object. Within the top level “aggregations” object we see a recursive nested structure; each nested object has a “buckets” object, which contains an array of objects, and each object also contains a “key”. The question now is, how do we efficiently traverse the above object to extract each “key” value while retaining the parent’s object “key” as well? To further illustrate, taking a subset of the example above:

rxBlog1.png

 

It was actually easier for me to reason about the above using imperative style programming, which would look something like this:

However, the idea is to use ReactiveX programming to traverse the tree in order to make the code more concise. At each key, the program should “pass down” the key to it’s child observables right down to the final child, which would then emit the result to a subscriber. This is what we end up with (in RxPY):

 

Let’s step through the code:

  • Lines 3-4: If you notice, each object (which I call “aggregation” in the code) contains an object called “buckets” which is an array. We can create observables from arrays, so this function simply grabs the “buckets” array of an arbitrary aggregation and returns an observable
  • Line 6-7: First time we call the getAggregation function to return an observable. Now we have an observable emitting the outer objects. We need access to the next inner object, which itself contains another “buckets” array that we can turn into a “child observable”. Therefore each object (which I call “transaction” in the code) is passed into the getAggregation function once again. However, we would like to pass on the parent’s “key” value to every emission from these child observables. That is the role of the map function which pre-pends the key to the actual emission.
  • At this point we have an observable of observables – which we need to flatten in order to pass it to subsequent stages – that’s the role of the flat_map stage.
  • Lines 8-9 are repetitions of the same pattern described above – note how at each stage we add the key to the emission of the child observable and flatten the observables into a single stream for the next stage
  • Line 10: we call our final “map” to transform the results into tuples as shown in our diagram above
  • Line 11: generic subscriber function

It’s a good exercise to:

  • really understand the difference between flat_map and map
  • understand how to pass variables to child observables via the use of a “map nested in flat_map” pattern
  • did it really make the code more concise? Do you still find it easier to reason in terms of imperative?
Advertisements

Lessons Learned: Winlogbeat & Forwarded Events – no event description

Scenario: Shipping Azure Cloud Logs to an Elasticsearch Cluster

The Azure Log Service [AZLog ] audits events across your Azure Cloud infrastructure, and sends these to a central log collector. It leverage the Windows Event Forwarding subsystem to do this, meaning that the collector server will be able to view the AZLog alerts via the Event Viewer > Forwarded Events

This means that probably the most efficient way of getting the AZLog events from the central collector to Elasticsearch would be to use WinLogBeat. Configuration is simple enough, simply using a WinLogBeat configuration file like so would do the job [1]:

winlogbeat.event_logs:
 - name: ForwardedEvents
   ignore_older: 72h

However, once we did this we ran into a strange issue… The events in elasticsearch were missing the “message” field. Some documentation and issues pointed to the fact that using the “Events” format rather than the default “Rendered Text” could cause such issues [2][3]. The problem is that you sometime don’t have direct control over the windows event infrastructure – it’s handled by someone else. We got around this by setting “event_logs.forwarded” to false. The winlogbeat documentation states [4]:

event_logs.forwarded: A boolean flag to indicate that the log contains only events collected from remote hosts using the Windows Event Collector. The value defaults to true for the ForwardedEvents log and false for any other log. This option is only available on operating systems supporting the Windows Event Log API (Microsoft Windows Vista and newer). This settings allows Winlogbeat to optimize reads for forwarded events that are already rendered. When the value is true Winlogbeat does not attempt to render the event using message files from the host computer. The Windows Event Collector subscription should be configured to use the “RenderedText” format (this is the default) to ensure that the events are distributed with messages and descriptions

Note that it default to true for ForwardedEvents. Turning this to false in the AZLog context brought back our all-important message field…

References

[1] http://syspanda.com/index.php/2017/03/01/sending-windows-event-forwarder-server-wef-logs-to-elasticsearch/

[2] https://github.com/elastic/beats/issues/1031

[3]https://docs.microsoft.com/en-us/windows/threat-protection/use-windows-event-forwarding-to-assist-in-instrusion-detection

[4] https://www.elastic.co/guide/en/beats/winlogbeat/master/configuration-winlogbeat-options.html