Updates from October, 2016 Toggle Comment Threads | Keyboard Shortcuts

  • dvas0004 11:26 am on October 11, 2016 Permalink
    Tags: , NoSQL   

    Elasticsearch & Java: Tips for faster re-indexing 

    Notes based on some feedback:

    • Elasticsearch seem to be pushing the REST client rather than using the native Java client… to future proof your code you may be better off going down this route.
    • Why not just use the Re-Index API? Although it’s still considered experimental, this may be a good option if you dont have to munge your data. In other words, if you dont need to modify or perform many operations while re-indexing your data, this API is the way to go. The re-index API does allow you to use “script” but this is rather limited and doesnt perform as well. In my use case presented below, I build the basis for a framework to add/edit more fields while re-indexing. Maybe a better term for this operation would be “context addition” rather than “re-indexing“… in this use case, you’ll have more flexibility and better performance with the below


    When it comes to re-indexing Elasticsearch documents, there is no doubt that the Java API is the way to go when performance is important. Using the Java API can have significant performance gains over using other APIs such as the Python Elasticsearch client.


    In this article we will present tips on writing the fastest Java Elasticsearch re-indexer possible.



    • Use the scroll API



    Unlike the python elasticsearch client, the Java API does not abstract the scroll API into a higher level “scan” helper function. This means you should implement the scroll API when querying large amounts of data.


    This is done as follows:

    In lines 1-10 above we see the exact syntax of building an API scroll, including some queries and filters. Particularly, note the use of .setScroll() and .setSize().


    .setSize() simply controls the amount of documents that will be returned per shard for each scroll.


    .setScroll() is important for long running operations, and it sets the value for how long a particular scroll view should remain valid. Failing to set this setting appropriately will results in “No Context” errors when updating the scroll ID.


    In lines 12 – 25 we see how to “operate” the prepared scroll. As the documentation explains, each scroll gets a number of hits which we process in the “for” loop. Once processed, we need to update the scroll with the new scroll ID (this marks which document the client has arrived to) and loop again until there scroll is finished, which we do in the “while” loop above.



    • Leverage multithreading, specifically using Java’s Thread Pools



    One area for performance optimization is the use of threads. In the snippet above, a very good option for threads is to process each hit, which should happen in line 15 of the snippet above.


    The easiest way to do this is to leverage Java’s ExecutorService as follows:


    ExecutorService executor = Executors.newFixedThreadPool(8);  // this produces a pool of 8 worker threads.


    Now that the thread pool is defined, we can start queueing runnable threads. Within the for loop in line 15 above, we process each hit in a new thread, for example:


    Runnable worker = new WorkerThread(hit);



    The “workerThread” class is basically a Java Runnable which processes each hit in the manner you require. The below sample shows a WorkerThread example:


    The above example doesn’t do anything other than read the “SourceAddress” field, and inserts a hypothetical “bulk_index” field. The new field is added by using our next performance tip: the Bulk Processor. In lines 24-28 the worker thread creates an “update request” which is later fed into a bulk processor to execute this request. In the above snippet, the worker thread calls a method in the parent (called “App.addToBulkRequest”)



    • Use the Elasticsearch Java BulkProcessor API



    Unlike the python elasticsearch client, the java bulk API has not yet been abstracted into a higher level parallel function like the python “parallel_bulk” function. However, we can still leverage the BulkProcessor java method, as we mentioned above.


    We first must prepare a bulk processor:


    In the first two lines we simply define the elasticsearch client, and we then define the bulk processor. The bulk processor has three listeners that can be overridden to take actions before an update, and after a successful or unsuccessful update.


    Of particular note are the last few line (32-36) where we set the options for the bulk processor. In this example, we set the number of concurrent requests to 3, and we also set several limits (such as 1GB, 5 seconds, or 10,000 documents) which when reached, will trigger a bulk update action.


    In our thread example aboves, the threads are feeding update requests into a method called “addToBulkRequest” which has two responsibilities:


    • Check if the update request is null. This is an important check. If the update request is null, the entire BulkProcessor will fail.
    • Add the update request to the bulk processor we defined above.


    With regards to bulk updates in elasticsearch, it’s worth going through the following issue discussion:

  • dvas0004 9:09 am on September 13, 2016 Permalink  

    Elasticsearch & Python: Tips for faster re-indexing 

    Some valuable lessons learned while going through an elasticsearch re-indexing exercise. (For the uninitiated, re-indexing data basically means getting large volumes of documents from elasticsearch, enriching or changing the data within each document, and then sending these back). The below assume python and the use of the python elasticsearch client (https://elasticsearch-py.readthedocs.io/en/master/).

    1. When retrieving data, use the “scan” helper function

    Most examples I encountered on the Internet used the “scroll” API. Nothing wrong with this, in fact the scan helper function is itself a higher level function based on the scroll API. However, the scan function abstracts everything away and frees the developer from having to worry about scroll IDs and so on. It’s much simpler to use, let’s see a quick example:

    In line 4 above, we use the usual search API with a size of 0 to count the number of matching documents (this replaces the currently deprecated “count” API), which we print out. In line 10, we use the scan API to return a very large number of documents. The scan function returns an iterable which allows us to process the response (the res variable above) using loops and so on, which leads us to our next point

    2. Use python’s multiprocessing capabilities to speed up the document processing

    Since the object returned from the scan helper function is an iterable, this makes it ideal to split this over multiple processes that obviously use up processing power much more efficiently. The easiest way to do this that I found was:

    • Create a multiprocessor pool with the appropriate number of processes (usually one per CPU), and an array which will aggregate the results from the different processes
      import multiprocessing as mp
      pool = mp.Pool(processes=4) # start 4 worker processes
      result = []
    • Use python’s multiprocessing pool “map” functionality to split the iterable returned from the scan helper function over the processors
      N = 11
      while True:
          g2 = pool.map(processor, itertools.islice(res, N))
          if g2:

      In the above code snippet, “processor” is a function which processes the documents. Each processor is fed a slice of the res iterator. Note how the “itertools.islice” function splits the res iterator returned from elasticsearch scan API into 11 slices. The results from each process is placed in the “results” array we previously initialized.

      At this stage, we have four processes in parallel munging through the documents, and emitting their results to a common array ready to be passed to the next step

    3. Use the Python Elasticsearch “Parallel Bulk” helper function

    The parallel bulk helper function again abstract a lot of work away from the developer. It allows you to very simply define the number of threads used to update elasticsearch and so on. As input, the parallel bulk API takes an array of “action items”, with each action being a python dictionary, an example of which can be seen below:

    {'_op_type': "update",
            '_index': "filebeat-123",
            '_type': "syslog",
            '_id': "ABSAJSGA19872972_",
            'doc': { 'foo' : 'bar'}

    In our case, this dictionary is what is returned by each of our multiprocessing processors, and appended to the results array. We now simply feed this array into the parallel bulk helper function as follows:

    print 'updating results'
    parallel_bulk(es, result)
    print 'done updating results'

    Where “es” is the elasticsearch client, and result is the array which the processors build, with each element being a dictionary as described above

    Note: In my case, the above methods increased the throughput from approximately 2 documents/sec to about 10 documents/sec, but these figures came nowhere near as being as fast as a java implementation which gave me about 80-100 documents/sec. 

    PS: the throughput numbers I quoted above are not meant as absolute benchmarks, I do some pretty heavy processing including HTTP requests, so you are likely to get faster results depending on your processing. The numbers are meant as relative benchmarks to show that multiprocessing can increase your throughput by 4x-5x as much.

    P.S.S: I will post an article with the Java re-indexing lessons learned (hopefully soon)

Compose new post
Next post/Next comment
Previous post/Previous comment
Show/Hide comments
Go to top
Go to login
Show/Hide help
shift + esc
%d bloggers like this: