Elasticsearch & Python: Tips for faster re-indexing

Some valuable lessons learned while going through an elasticsearch re-indexing exercise. (For the uninitiated, re-indexing data basically means getting large volumes of documents from elasticsearch, enriching or changing the data within each document, and then sending these back). The below assume python and the use of the python elasticsearch client (https://elasticsearch-py.readthedocs.io/en/master/).

1. When retrieving data, use the “scan” helper function

Most examples I encountered on the Internet used the “scroll” API. Nothing wrong with this, in fact the scan helper function is itself a higher level function based on the scroll API. However, the scan function abstracts everything away and frees the developer from having to worry about scroll IDs and so on. It’s much simpler to use, let’s see a quick example:

In line 4 above, we use the usual search API with a size of 0 to count the number of matching documents (this replaces the currently deprecated “count” API), which we print out. In line 10, we use the scan API to return a very large number of documents. The scan function returns an iterable which allows us to process the response (the res variable above) using loops and so on, which leads us to our next point

2. Use python’s multiprocessing capabilities to speed up the document processing

Since the object returned from the scan helper function is an iterable, this makes it ideal to split this over multiple processes that obviously use up processing power much more efficiently. The easiest way to do this that I found was:

  • Create a multiprocessor pool with the appropriate number of processes (usually one per CPU), and an array which will aggregate the results from the different processes
    import multiprocessing as mp
    pool = mp.Pool(processes=4) # start 4 worker processes
    result = []
  • Use python’s multiprocessing pool “map” functionality to split the iterable returned from the scan helper function over the processors
    N = 11
    
    while True:
    
        g2 = pool.map(processor, itertools.islice(res, N))
        if g2:
            result.extend(g2)
            time.sleep(1)
        else:
            break

    In the above code snippet, “processor” is a function which processes the documents. Each processor is fed a slice of the res iterator. Note how the “itertools.islice” function splits the res iterator returned from elasticsearch scan API into 11 slices. The results from each process is placed in the “results” array we previously initialized.

    At this stage, we have four processes in parallel munging through the documents, and emitting their results to a common array ready to be passed to the next step

3. Use the Python Elasticsearch “Parallel Bulk” helper function

The parallel bulk helper function again abstract a lot of work away from the developer. It allows you to very simply define the number of threads used to update elasticsearch and so on. As input, the parallel bulk API takes an array of “action items”, with each action being a python dictionary, an example of which can be seen below:

{'_op_type': "update",
        '_index': "filebeat-123",
        '_type': "syslog",
        '_id': "ABSAJSGA19872972_",
        'doc': { 'foo' : 'bar'}
}

In our case, this dictionary is what is returned by each of our multiprocessing processors, and appended to the results array. We now simply feed this array into the parallel bulk helper function as follows:

print 'updating results'
parallel_bulk(es, result)
print 'done updating results'

Where “es” is the elasticsearch client, and result is the array which the processors build, with each element being a dictionary as described above

Note: In my case, the above methods increased the throughput from approximately 2 documents/sec to about 10 documents/sec, but these figures came nowhere near as being as fast as a java implementation which gave me about 80-100 documents/sec. 

PS: the throughput numbers I quoted above are not meant as absolute benchmarks, I do some pretty heavy processing including HTTP requests, so you are likely to get faster results depending on your processing. The numbers are meant as relative benchmarks to show that multiprocessing can increase your throughput by 4x-5x as much.

P.S.S: I will post an article with the Java re-indexing lessons learned (hopefully soon)