Channels
Explore how channels in Kotlin enable communication between coroutines by suspending rather than blocking execution. Understand creating typed channels, sending and receiving data, and using producers and actors. Learn about buffered channels to decouple producers from consumers and improve concurrency management.
We'll cover the following...
Channels
We now know how to spawn coroutines and control them.
But, what if two coroutines need to communicate with each other?
In Java, threads communicate either by using the wait() or notify() or notifyAll() pattern or by using one of the rich set of classes from the java.util.concurrent package—for example, BlockingQueue.
In Kotlin, as you may have noticed, there are no wait() or notify() methods. Instead, to communicate between coroutines, Kotlin uses channels. Channels are very similar to BlockingQueue, but instead of blocking a thread, channels suspend a coroutine, which is a lot cheaper. We’ll use the following steps to create a channel and a coroutine.
-
First, let’s create a channel:
Channels are typed. This channel can only receive integers.
- Then, let’s create a coroutine that reads from this channel:
launch {for (c in chan) {println(c)}}
Reading from a channel is as simple as using a for loop.
- Now, let’s send some values to this channel. This is as simple as using the
send()function:
(1..10).forEach {chan.send(it)}chan.close()
- Finally, we close the channel. Once closed, the coroutine that listens to the channel will also break out of the
forEachloop, and if there’s nothing else to do, the coroutine will terminate.
This style of communication is called Communicating Sequential Processes, or more simply, CSP.
Run the following code to see its output:
/*
* This file was generated by the Gradle 'init' task.
*
* The settings file is used to specify which projects to include in your build.
*
* Detailed information about configuring a multi-project build in Gradle can be found
* in the user manual at https://docs.gradle.org/8.0.2/userguide/multi_project_builds.html
*/
rootProject.name = "Project"
include("app")
As we can see, channels are a convenient and type-safe way to communicate between different coroutines. But we had to define the channels manually. In the following two sections, we’ll see how this can be further simplified.
Producers
If we need a coroutine that supplies a stream of values, we could use the produce() function. This function creates a coroutine that is backed up by ReceiveChannel<T>, where T is the type the coroutine produces.
We could rewrite the example from the previous section, as follows, by using the produce() function:
val chan = produce {(1..10).forEach {send(it)}}launch {for (c in chan) {println(c)}}
Note that inside the produce() block, the send() function is readily available for us to push new values to the channel.
Instead of using a for loop in our consumer coroutine, we can use a consumeEach() function:
launch {chan.consumeEach {println(it)}}
See the output by executing the code below:
/*
* This file was generated by the Gradle 'init' task.
*
* The settings file is used to specify which projects to include in your build.
*
* Detailed information about configuring a multi-project build in Gradle can be found
* in the user manual at https://docs.gradle.org/8.0.2/userguide/multi_project_builds.html
*/
rootProject.name = "Project"
include("app")
Now, it’s time to look at another example where a coroutine is bound to a channel.
Actors
Like producer(), actor() is a coroutine bound to a channel. But instead of a channel going out of the coroutine, there’s a channel going into the coroutine.
Let’s look at the following example:
val actor = actor<Int> {channel.consumeEach {println(it)}}(1..10).forEach {actor.send(it)}
In this example, our main function is again producing the values and the actors consume them through the channel. This is very similar to the first example we saw, but instead of explicitly creating a channel and a separate coroutine, we have them bundled together.
See the output by executing the code below:
/*
* This file was generated by the Gradle 'init' task.
*
* The settings file is used to specify which projects to include in your build.
*
* Detailed information about configuring a multi-project build in Gradle can be found
* in the user manual at https://docs.gradle.org/8.0.2/userguide/multi_project_builds.html
*/
rootProject.name = "Project"
include("app")
Buffered channels
In all of the previous examples, whether creating channels explicitly or implicitly, we used their unbuffered version. To demonstrate what this means, let’s take a look at a slightly altered example from the previous section:
val actor = actor<Long> {var prev = 0Lchannel.consumeEach {println(it - prev)prev = itdelay(100)}}
Here, we have almost the same actor object, which receives timestamps and prints the difference between every two timestamps it gets. We also introduce a small delay before it can read the next value.
Instead of sending a sequence of numbers, we would send the current timestamp to this actor object:
repeat(10) {actor.send(System.currentTimeMillis())}actor.close().also { println("Done sending") }
Now, let’s take a look at the output of our code:
> ...> 101> 103> 101> Done sending
The producer doesn’t have to wait for the consumer anymore, because the channel now buffers the messages. So, the messages are sent as fast as possible and the actor is still able to consume them at its own pace.
In a similar manner, capacity could be defined on the producer channel:
val chan = produce(capacity = 10) {(1..10).forEach {send(it)}}
And it could be defined on the raw channel as well:
val chan = Channel<Int>(10)
Buffered channels are a very powerful concept that allow us to decouple producers from consumers. We should use them carefully, though, because the larger the capacity of the channel is, the more memory it will require.
/*
* This file was generated by the Gradle 'init' task.
*
* The settings file is used to specify which projects to include in your build.
*
* Detailed information about configuring a multi-project build in Gradle can be found
* in the user manual at https://docs.gradle.org/8.0.2/userguide/multi_project_builds.html
*/
rootProject.name = "Project"
include("app")
Channels are a relatively low-level concurrency construct. Next, we’ll take a look at another type of stream, which provides us with a higher level of abstraction.