Kotlin Coroutines – Flow parallel processing

Meant as an alternative to the Kotline Coroutine “Channel”, a “Flow” is another way of enabling communication between two co-routines. The difference between the two is essentially that a channel is “hot” (i.e. starts as soon as it is declared), while a flow is “cold” (i.e. only starts when required (or “subscribed to” in reactive parlance).

Coroutines enable us to do faster processing. Essentially coroutines are light-weight threads, and one common scenario is something along these lines:

In the above architecture, some sort of “producer” coroutine is generating events or otherwise producing data. This data is placed into an intermediate message bus, and consuming by multiple “worker” coroutines which probably do some CPU-intensive processing on said data

The intermediate “message bus” can be one of many things, such as:

  • A redis / zeromq / kafka event bus
  • Your typical Java threading queue (or bounded Thread Pool to which you submit jobs)
  • A Kotlin Channel
  • A Kotlin Flow

In general, the above architecture is referred to as “fan-out”. Using channels it’s relatively easy to fan-out, as described here:

https://kotlinlang.org/docs/reference/coroutines/channels.html#fan-out

With flows, it’s less obvious. My first attempt follows:

// !!!!!!THIS WILL WORK - BUT NOT TRULY IN PARALLEL!!!!!!!
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun producerCoroutine() : Flow<String> = flow {
    for (i in 1..4) {
     	emit("Hi! $i")   
        delay(1000)
    }    
}

suspend fun workerCoroutine(input: String) {
    delay(1000)
    val t : Thread = Thread.currentThread()
    println("${t.getId()}: $input")
}


fun main() {
    runBlocking{
         val time = measureTimeMillis {
             producerCoroutine().buffer().map{
                 async(Dispatchers.Default){
                     workerCoroutine(it)
                 }
             }.collect{it.await()}
        }   
         println("Ready in $time ms")
    }    
}

The above will output something similar to:

8: Hi! 1
8: Hi! 2
9: Hi! 3
8: Hi! 4
Ready in 4096 ms

A couple of notes:

  • the producer coroutine returns a flow
  • note the use of the “buffer()” function. This “decouples” the producer from the workers. Without it, the flow must be completed before it starts being collected. With it, the collection/processing can start without waiting for the producer to be ready
  • the main difference is the insertion of the “map” function which grabs every data entry from the flow and wraps it within an “async” builder function. The “async” builder function runs coroutines asynchronously and – more importantly in this case – allows us to define which “Dispatcher” to use that controls which core / thread to run the worker on.
  • We “await” the async functions in the “collect()” function
  • In the output, note how the worker co-routines are executed on two threads: 8 and 9, meaning we achieved our aim of fanning-out our workers across separate threads

However, if you beef up our first attempt with something that takes more CPU power and monitor resource usage, we don’t see a significant uptake of resources as expected. This is because even though the async “map” is distributing across various threads, it’s not doing so concurrently due to the subsequent “collect” where we “await” for each and every async function in sequence – essentially rendering our operation back to sequential.

In order to get around this, we need to await all our async functions at once, rather than in sequence. Our second attempt looks like so:

val CyberSiftThreadPool = Executors.newFixedThreadPool(20).asCoroutineDispatcher()

    runBlocking{
        val deferreds = ArrayList<Deferred<Unit>>()
        val time = measureTimeMillis {
             producerCoroutine()
                .buffer(1000)
                .collect{
                 val d = async(CyberSiftThreadPool){
                     workerCoroutine(it)
                 }
                deferreds.add(d)
             }
        }   
        deferreds.awaitAll()
        println("Ready in $time ms")
    } 

Some notes:

  • The first line (CyberSiftThreadPool) is not strictly speaking necessary – however I left it hear to show that it is in fact possible to define your own thread pool to execute your Async functions on
  • We increased the buffer capacity to 1000 – also not strictly necessary but good to know we can do this
  • We define a new ArrayList called deferreds – we’ll use this to collect our deferred async functions.
  • In our collect function we add each async function to the deferreds ArrayList
  • Note that we removed the “map” functions because we no longer have intermediate steps. Since “collect” is a terminal function that is required to subscribe to our flow, we simply use it to both subscribe to the flow and kick off our async execution
  • Once the flow is done, we use awaitAll() to simultaneously wait on all the deferred async functions at once (in contrast to before where we waited for them sequentially in the “collect” function