Notes based on some feedback:
- Elasticsearch seem to be pushing the REST client rather than using the native Java client… to future proof your code you may be better off going down this route.
- Why not just use the Re-Index API? Although it’s still considered experimental, this may be a good option if you dont have to munge your data. In other words, if you dont need to modify or perform many operations while re-indexing your data, this API is the way to go. The re-index API does allow you to use “script” but this is rather limited and doesnt perform as well. In my use case presented below, I build the basis for a framework to add/edit more fields while re-indexing. Maybe a better term for this operation would be “context addition” rather than “re-indexing“… in this use case, you’ll have more flexibility and better performance with the below
When it comes to re-indexing Elasticsearch documents, there is no doubt that the Java API is the way to go when performance is important. Using the Java API can have significant performance gains over using other APIs such as the Python Elasticsearch client.
In this article we will present tips on writing the fastest Java Elasticsearch re-indexer possible.
- Use the scroll API
Unlike the python elasticsearch client, the Java API does not abstract the scroll API into a higher level “scan” helper function. This means you should implement the scroll API when querying large amounts of data.
This is done as follows:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SearchResponse scrollResp = client.prepareSearch("filebeat-*") | |
// set the timeout value – important for long running scrolls | |
.setScroll(new TimeValue(60*60)) | |
// example of an "OR" filter using "should" boolean expression | |
.setQuery(QueryBuilders.boolQuery().should(QueryBuilders.termQuery("Type", "TRAFFIC")) | |
.should(QueryBuilders.termQuery("pfsense_type", "filterlog:")).minimumShouldMatch("1")) | |
// example of filter to query documents between a time range | |
.setPostFilter(QueryBuilders.rangeQuery("@timestamp").from(QueryFrom).to(QueryTo)) | |
.setSize(1000).execute().actionGet(); //10,000 hits per shard will be returned for each scroll | |
//Scroll until no hits are returned | |
while (true) { | |
for (SearchHit hit : scrollResp.getHits().getHits()) { | |
//process the search hit | |
} | |
//update the scroll ID | |
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(timeValueSetting)).execute().actionGet(); | |
//Break condition: No hits are returned so the scroll is finished | |
if (scrollResp.getHits().getHits().length == 0) { | |
break; | |
} | |
} |
In lines 1-10 above we see the exact syntax of building an API scroll, including some queries and filters. Particularly, note the use of .setScroll() and .setSize().
.setSize() simply controls the amount of documents that will be returned per shard for each scroll.
.setScroll() is important for long running operations, and it sets the value for how long a particular scroll view should remain valid. Failing to set this setting appropriately will results in “No Context” errors when updating the scroll ID.
In lines 12 – 25 we see how to “operate” the prepared scroll. As the documentation explains, each scroll gets a number of hits which we process in the “for” loop. Once processed, we need to update the scroll with the new scroll ID (this marks which document the client has arrived to) and loop again until there scroll is finished, which we do in the “while” loop above.
- Leverage multithreading, specifically using Java’s Thread Pools
One area for performance optimization is the use of threads. In the snippet above, a very good option for threads is to process each hit, which should happen in line 15 of the snippet above.
The easiest way to do this is to leverage Java’s ExecutorService as follows:
ExecutorService executor = Executors.newFixedThreadPool(8); // this produces a pool of 8 worker threads.
Now that the thread pool is defined, we can start queueing runnable threads. Within the for loop in line 15 above, we process each hit in a new thread, for example:
Runnable worker = new WorkerThread(hit);
executor.execute(worker);
The “workerThread” class is basically a Java Runnable which processes each hit in the manner you require. The below sample shows a WorkerThread example:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class WorkerThread implements Runnable { | |
SearchHit hit; | |
public WorkerThread(SearchHit hit){ | |
this.hit = hit; | |
} | |
@Override | |
public void run() { | |
//Handle the hit… | |
String srcIP = ""; | |
String ES_ID = this.hit.getId(); | |
String ES_INDEX = this.hit.getIndex(); | |
System.out.println("Processing "+ES_ID); | |
Map __source = this.hit.getSource(); | |
srcIP = (String) __source.get("SourceAddress"); | |
//add destination tuples | |
try { | |
App.addToBulkRequest(new UpdateRequest().index(ES_INDEX).type(this.hit.getType()).id(ES_ID) | |
.doc(jsonBuilder() | |
.startObject() | |
.field("bulk_index", "true") | |
.endObject()) | |
); | |
} catch (IOException e1) { | |
// TODO Auto-generated catch block | |
e1.printStackTrace(); | |
} | |
} | |
} |
The above example doesn’t do anything other than read the “SourceAddress” field, and inserts a hypothetical “bulk_index” field. The new field is added by using our next performance tip: the Bulk Processor. In lines 24-28 the worker thread creates an “update request” which is later fed into a bulk processor to execute this request. In the above snippet, the worker thread calls a method in the parent (called “App.addToBulkRequest”)
- Use the Elasticsearch Java BulkProcessor API
Unlike the python elasticsearch client, the java bulk API has not yet been abstracted into a higher level parallel function like the python “parallel_bulk” function. However, we can still leverage the BulkProcessor java method, as we mentioned above.
We first must prepare a bulk processor:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
client = TransportClient.builder().build() | |
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.10.15"), 9300)); | |
bulkProcessor = BulkProcessor.builder( | |
client, | |
new BulkProcessor.Listener() { | |
@Override | |
public void afterBulk(long arg0, BulkRequest arg1, | |
BulkResponse arg2) { | |
// TODO Auto-generated method stub | |
System.out.println("Finished bulk response"); | |
} | |
@Override | |
public void afterBulk(long arg0, BulkRequest arg1, | |
Throwable failure) { | |
// TODO Auto-generated method stub | |
System.out.println("Failed to finish bulk response"); | |
failure.printStackTrace(); | |
} | |
@Override | |
public void beforeBulk(long arg0, BulkRequest arg1) { | |
// TODO Auto-generated method stub | |
System.out.println("Starting bulk response"); | |
} | |
}) | |
.setBulkActions(10000) | |
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) | |
.setFlushInterval(TimeValue.timeValueSeconds(5)) | |
.setConcurrentRequests(3) | |
.build(); |
In the first two lines we simply define the elasticsearch client, and we then define the bulk processor. The bulk processor has three listeners that can be overridden to take actions before an update, and after a successful or unsuccessful update.
Of particular note are the last few line (32-36) where we set the options for the bulk processor. In this example, we set the number of concurrent requests to 3, and we also set several limits (such as 1GB, 5 seconds, or 10,000 documents) which when reached, will trigger a bulk update action.
In our thread example aboves, the threads are feeding update requests into a method called “addToBulkRequest” which has two responsibilities:
- Check if the update request is null. This is an important check. If the update request is null, the entire BulkProcessor will fail.
- Add the update request to the bulk processor we defined above.
With regards to bulk updates in elasticsearch, it’s worth going through the following issue discussion:
https://github.com/elastic/elasticsearch/issues/4301#issuecomment-29604375
One thought on “Elasticsearch & Java: Tips for faster re-indexing”
Comments are closed.