Nugget Post: Reactive Functions to parse nested objects

Note this article assumes familiarity with the Observer Pattern / Reactive Programming as described here: http://reactivex.io/

Some APIs return complex nested JSON objects. For example, take this cleaned up sample response from ElasticSearch (which incidentally is used to build the “Data Table” visualization):

Note the structure of the object. Within the top level “aggregations” object we see a recursive nested structure; each nested object has a “buckets” object, which contains an array of objects, and each object also contains a “key”. The question now is, how do we efficiently traverse the above object to extract each “key” value while retaining the parent’s object “key” as well? To further illustrate, taking a subset of the example above:

rxBlog1.png

 

It was actually easier for me to reason about the above using imperative style programming, which would look something like this:

However, the idea is to use ReactiveX programming to traverse the tree in order to make the code more concise. At each key, the program should “pass down” the key to it’s child observables right down to the final child, which would then emit the result to a subscriber. This is what we end up with (in RxPY):

 

Let’s step through the code:

  • Lines 3-4: If you notice, each object (which I call “aggregation” in the code) contains an object called “buckets” which is an array. We can create observables from arrays, so this function simply grabs the “buckets” array of an arbitrary aggregation and returns an observable
  • Line 6-7: First time we call the getAggregation function to return an observable. Now we have an observable emitting the outer objects. We need access to the next inner object, which itself contains another “buckets” array that we can turn into a “child observable”. Therefore each object (which I call “transaction” in the code) is passed into the getAggregation function once again. However, we would like to pass on the parent’s “key” value to every emission from these child observables. That is the role of the map function which pre-pends the key to the actual emission.
  • At this point we have an observable of observables – which we need to flatten in order to pass it to subsequent stages – that’s the role of the flat_map stage.
  • Lines 8-9 are repetitions of the same pattern described above – note how at each stage we add the key to the emission of the child observable and flatten the observables into a single stream for the next stage
  • Line 10: we call our final “map” to transform the results into tuples as shown in our diagram above
  • Line 11: generic subscriber function

It’s a good exercise to:

  • really understand the difference between flat_map and map
  • understand how to pass variables to child observables via the use of a “map nested in flat_map” pattern
  • did it really make the code more concise? Do you still find it easier to reason in terms of imperative?
Advertisements

Elasticsearch REST API: JEST upsert

I’ve already written about tips and tricks when using the Elasticsearch Java API. The Elasticsearch REST API has been going from strength to strength, and it seems that going forward the Elasticsearch team will focus more on the REST API than the native JAVA client. At the time of writing however, the official java REST library doesn’t seem to have support for the abstraction of the bulk API, so I followed some advice and looked into the JEST library.

The only snag with the Jest library is that when it comes to bulk operations, the documentation only gives examples of scripted updates. The Elasticsearch update API also allows for updates using partial documents. Jest supports this functionality, but I couldn’t find good documentation for this. Here-under is an example for anyone looking for this:

The important points:

  • You can still use the official java elasticsearch client’s “XContentFactory.jsonBuilder” library to more easily build your JSON objects.
  • The trick is in line 26 above:

jsonBuilder().startObject().startObject(doc”)

This creates a nested object with “doc” as the inner JSON object, as outlined by the elasticsearch documentation:

{
    "doc" : {
        "name" : "new_name"
    }
}

The first “startObject()” creates the outer curly brackets, while the second startObject(“doc”) creates the inner “doc” object.

  • We add content to the JSON object in lines 27-29
  • Just like we had to use two startObject() calls, we need to close the object with two endObject() calls as shown in line 31

The rest of the snippet deals with the actual bulk update. We pass the object we just created into an Update Builder, which gives us a “Bulkable Object” that we can pass on to the jest bulk processor. The snippet is taken from a larger program where it resides in a loop – which explains the if/else clause in lines 37-48; it’s important to “flush” the bulk service every so often. The native java client would to this automatically – so far in Jest you need to account for this yourself