Note this article assumes familiarity with the Observer Pattern / Reactive Programming as described here: http://reactivex.io/
Some APIs return complex nested JSON objects. For example, take this cleaned up sample response from ElasticSearch (which incidentally is used to build the “Data Table” visualization):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"aggregations": { | |
"transactions": { | |
"buckets": [ | |
{ | |
"key": 1512720000000, | |
"BySourceAddress": { | |
"buckets": [ | |
{ | |
"key": "192.168.2.2", | |
"ByASN": { | |
"buckets": [ | |
{ | |
"key": "MICROSOFT-CORP-MSN-AS-BLOCK – Microsoft Corporation, US", | |
"ByDestPort": { | |
"buckets": [ | |
{ | |
"key": "53" | |
} | |
] | |
} | |
}, | |
{ | |
"key": "AKAMAI-ASN2, US", | |
"ByDestPort": { | |
"buckets": [ | |
{ | |
"key": "53" | |
} | |
] | |
} | |
} | |
] | |
} | |
}, | |
{ | |
"key": "10.40.0.40", | |
"ByASN": { | |
"buckets": [ | |
{ | |
"key": "GOOGLE – Google LLC, US", | |
"ByDestPort": { | |
"buckets": [ | |
{ | |
"key": "53" | |
}, | |
{ | |
"key": "443" | |
} | |
] | |
} | |
}, | |
{ | |
"key": "MICROSOFT-CORP-MSN-AS-BLOCK – Microsoft Corporation, US", | |
"ByDestPort": { | |
"buckets": [ | |
{ | |
"key": "137" | |
}, | |
{ | |
"key": "443" | |
} | |
] | |
} | |
} | |
] | |
} | |
} | |
] | |
} | |
} | |
] | |
} | |
} | |
} |
Note the structure of the object. Within the top level “aggregations” object we see a recursive nested structure; each nested object has a “buckets” object, which contains an array of objects, and each object also contains a “key”. The question now is, how do we efficiently traverse the above object to extract each “key” value while retaining the parent’s object “key” as well? To further illustrate, taking a subset of the example above:
It was actually easier for me to reason about the above using imperative style programming, which would look something like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for bucket in res['transactions']['buckets']: | |
key1 = bucket['key'] | |
for innerbucket1 in bucket['BySourceAddress']: | |
key2 = innerbucket1['key'] | |
for innerbucket2 in innerbucket1['buckets']: | |
key3 = innerbucket2['key'] | |
for innerbucket3 in innerbucket2['buckets']: | |
key4 = innerbucket3['key'] | |
print "({},{}.{},{})".format(key1,key2,key3,key4) |
However, the idea is to use ReactiveX programming to traverse the tree in order to make the code more concise. At each key, the program should “pass down” the key to it’s child observables right down to the final child, which would then emit the result to a subscriber. This is what we end up with (in RxPY):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rx import Observable, Observer | |
def getAggregation(transaction, aggName): | |
return Observable.from_(transaction[aggName]["buckets"]) | |
getAggregation(res["aggregations"],"transactions") \ | |
.flat_map(lambda transaction : getAggregation(transaction, 'BySourceAddress').map(lambda b: (transaction["key"],b) ) ) \ | |
.flat_map(lambda transaction : getAggregation(transaction[1], 'ByASN').map(lambda b: (transaction[0],transaction[1]["key"],b) ) ) \ | |
.flat_map(lambda transaction : getAggregation(transaction[2], 'ByDestPort').map(lambda b: (transaction[0],transaction[1],transaction[2]["key"],b) ) ) \ | |
.map(lambda t : (t[0], t[1],t[2], t[3]["key"])) \ | |
.subscribe(DataObserver()) |
Let’s step through the code:
- Lines 3-4: If you notice, each object (which I call “aggregation” in the code) contains an object called “buckets” which is an array. We can create observables from arrays, so this function simply grabs the “buckets” array of an arbitrary aggregation and returns an observable
- Line 6-7: First time we call the getAggregation function to return an observable. Now we have an observable emitting the outer objects. We need access to the next inner object, which itself contains another “buckets” array that we can turn into a “child observable”. Therefore each object (which I call “transaction” in the code) is passed into the getAggregation function once again. However, we would like to pass on the parent’s “key” value to every emission from these child observables. That is the role of the map function which pre-pends the key to the actual emission.
- At this point we have an observable of observables – which we need to flatten in order to pass it to subsequent stages – that’s the role of the flat_map stage.
- Lines 8-9 are repetitions of the same pattern described above – note how at each stage we add the key to the emission of the child observable and flatten the observables into a single stream for the next stage
- Line 10: we call our final “map” to transform the results into tuples as shown in our diagram above
- Line 11: generic subscriber function
It’s a good exercise to:
- really understand the difference between flat_map and map
- understand how to pass variables to child observables via the use of a “map nested in flat_map” pattern
- did it really make the code more concise? Do you still find it easier to reason in terms of imperative?
You must be logged in to post a comment.