Spring Boot R2DBC INSERT batching (Reactive SQL)

Batching is the act of gathering multiple statements together and executing them over a single database connection. Batching has performance benefits since the database can better optimize the batched queries, however it also “saves” having to open individual database connections and reduces connection starvation. Connection starvation in Postgres ends up spitting out errors along the lines of “too many clients”.

Unfortunately, in the context of Spring Boot R2DBC, batching is not currently supported via high-level abstractions like R2DBC repositories and such:

https://github.com/spring-projects/spring-data-r2dbc/issues/259

Instead we have to use a lower level approach and use the Database Client directly

Pre-requisites

Whether or not you use higher-level abstractions like R2RBC data repositories, the first step is to define a configuration been which will act as your database connection factory. For example, using postgres would result in something similar to:

@Configuration
class Database : AbstractR2dbcConfiguration() {

    @Bean
	override fun connectionFactory(): ConnectionFactory {
        return PostgresqlConnectionFactory(
			PostgresqlConnectionConfiguration.builder()
				.host("localhost")
				.database("my-database")
				.username("my-username")
				.password("my-password").build()
		)
    }


}

Once that is done, you are free to use Spring Boot R2DBC connection primitives like templates or reactive repositories as described in:

https://spring.io/guides/gs/accessing-data-r2dbc/

Actually batching

So far, so good but as noted previously this wont support batching. If you were to have some Flux of objects you’d like to insert into the database, these insertions would not be batched. The github issue linked above has a clue on how to do this:

However, the syntax is a bit old and doesn’t offer much context of how to use the above in a flow made up of flux and mono.

The final approach which worked well resulted in the following code:

somethingThatReturnsAFlux()     // Flux<SomeObject>
            // error handling
            .onErrorResume { e ->    
                e.printStackTrace()
                Flux.empty()
            }
            // collect flux to list... unfortunately this is necessary due to the next step
            .collectList()
            .flatMap {
                // databaseConnection is the connection factory built in the "pre-requisites"
                Mono.from(databaseConnection.create())    
                    // create a batch
                    .map{connection -> 
                       // add operations in SQL to the batch
                       val batch = connection.createBatch()
                        for ( item in some_list){
                            batch.add("INSERT INTO some table(col1, col2) values ('${item.col1}','${item.col2')")
                        }
                        // execute the batch
                        batch.execute().toMono().doFinally {
                           println("Closing Domain Threats Update DB Connection")
                           connection.close()
                       }; 
                    }}
            }
            //optional: flow can be executed on seperate thread pools 
            .subscribeOn(Schedulers.boundedElastic())
            .block()

A couple of extra notes about the above:

  • Line 20: batch.execute() actually returns a publisher… it doesn’t actually execute the batch. Difficult to catch unless you’re really paying attention to the IDE. A publisher won’t do anything till you subscribe to it, which in the example above we do by turning it into a Mono that is subscribed to on line 27.
  • Line 20: also note the use of “doFinally” which will close off the database connection once the operation is done to avoid any dangling database connections