How do I combine Kotlin collections with coroutines and flows for asynchronous processing?

You typically combine Kotlin collections, coroutines, and Flow by using:

  • collections for in-memory data
  • coroutines for concurrency / async work
  • Flow for asynchronous streams of values

Basic idea

If you have a collection:

val ids = listOf(1, 2, 3, 4, 5)

You can turn it into a Flow:

val idFlow = ids.asFlow()

Then process each item asynchronously using Flow operators:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val ids = listOf(1, 2, 3, 4, 5)

    ids.asFlow()
        .map { id ->
            fetchUser(id)
        }
        .collect { user ->
            println(user)
        }
}

suspend fun fetchUser(id: Int): String {
    delay(500)
    return "User $id"
}

Here:

  • asFlow() converts the collection into a Flow
  • map { } applies a suspending transformation
  • collect { } starts the flow and consumes results

Sequential asynchronous processing

By default, Flow processes elements sequentially:

ids.asFlow()
    .map { id ->
        fetchUser(id)
    }
    .collect { user ->
        println(user)
    }

Even though fetchUser is suspending, each item is processed one after another.

Concurrent processing with flatMapMerge

If you want to process multiple items concurrently, use flatMapMerge:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val ids = listOf(1, 2, 3, 4, 5)

    ids.asFlow()
        .flatMapMerge(concurrency = 3) { id ->
            flow {
                emit(fetchUser(id))
            }
        }
        .collect { user ->
            println(user)
        }
}

suspend fun fetchUser(id: Int): String {
    delay(500)
    return "User $id"
}

This allows up to 3 items to be processed at the same time.

Note: flatMapMerge may emit results out of the original order.

Keeping order while doing concurrent work

If you need concurrency but want results in the original order, you can use async with a collection:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val ids = listOf(1, 2, 3, 4, 5)

    val users = ids.map { id ->
        async {
            fetchUser(id)
        }
    }.awaitAll()

    println(users)
}

suspend fun fetchUser(id: Int): String {
    delay(500)
    return "User $id"
}

awaitAll() returns results in the same order as the original list.

Filtering and transforming Flow values

You can use familiar collection-like operators:

ids.asFlow()
    .filter { id ->
        id % 2 == 0
    }
    .map { id ->
        fetchUser(id)
    }
    .collect { user ->
        println(user)
    }

This is similar to collection processing, but it supports suspending operations.

Collecting a Flow back into a collection

If you need a List again:

val users: List<String> = ids.asFlow()
    .map { id -> fetchUser(id) }
    .toList()

Because toList() collects the flow, it must be called from a coroutine or suspend function.

Using flowOn for background work

You can move upstream processing to a dispatcher:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val ids = listOf(1, 2, 3, 4, 5)

    ids.asFlow()
        .map { id ->
            fetchUser(id)
        }
        .flowOn(Dispatchers.IO)
        .collect { user ->
            println(user)
        }
}

This is useful for I/O-bound work such as network or database calls.

Handling errors

Use catch to handle exceptions from upstream operators:

ids.asFlow()
    .map { id ->
        fetchUser(id)
    }
    .catch { error ->
        emit("Fallback user because of: ${error.message}")
    }
    .collect { user ->
        println(user)
    }

Example: process URLs asynchronously

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val urls = listOf(
        "https://example.com/a",
        "https://example.com/b",
        "https://example.com/c"
    )

    val results = urls.asFlow()
        .flatMapMerge(concurrency = 2) { url ->
            flow {
                val content = download(url)
                emit(url to content.length)
            }
        }
        .toList()

    println(results)
}

suspend fun download(url: String): String {
    delay(1_000)
    return "Content from $url"
}

When to use what

Need Use
Simple in-memory transformation Collection operators: map, filter
Suspending work per item, sequential asFlow().map { suspendCall() }
Suspending work per item, concurrent flatMapMerge
Concurrent work while preserving order map { async { ... } }.awaitAll()
Continuous stream of values Flow
Convert Flow back to List toList()

In short:

val results = items.asFlow()
    .filter { shouldProcess(it) }
    .flatMapMerge(concurrency = 4) { item ->
        flow {
            emit(processAsync(item))
        }
    }
    .toList()

That pattern is a good starting point for asynchronous collection processing with Kotlin coroutines and flows.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.