...

/

Add Concurrency With ConsumerSupervisor

Add Concurrency With ConsumerSupervisor

Learn how GenStage uses ConsumerSupervisor to achieve concurrency when starting processes.

The properties of ConsumerSupervisor

Using multiple consumers to process events one by one concurrently is a very useful technique. To make this task even easier, GenStage comes with a special supervisor called ConsumerSupervisor. It works like a consumer and can subscribe to one or more producers. It can also monitor, start and restart child processes, just like a supervisor.

What makes ConsumerSupervisor special is that when it receives a list of events from a producer, it automatically starts a process for each event and passes the event as an argument to the process. When a child process exits successfully, a new demand will be issued by ConsumerSupervisor, and the cycle repeats. This figure illustrates how child processes are started on-demand when new events are received:

Now, we’ll refactor some of our existing logic using ConsumerSupervisor to demonstrate how it works in practice.

Create a ConsumerSupervisor

We’ll create a new file, page_consumer_supervisor.ex, and place it in the lib directory. Here are the entire contents:

Press + to interact
#file path -> scraper/lib/page_consumer_supervisor.ex
defmodule PageConsumerSupervisor do
use ConsumerSupervisor
require Logger
def start_link(_args) do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
Logger.info("PageConsumerSupervisor init")
children = [
%{
id: PageConsumer,
start: {PageConsumer, :start_link, []},
restart: :transient
}
]
opts = [
strategy: :one_for_one,
subscribe_to: [
{PageProducer, max_demand: 2}
]
]
ConsumerSupervisor.init(children, opts)
end
end

This is a lot of new code, so let’s break it down and explain how it works.

The start_link function

We name our module PageConsumerSupervisor, and immediately after the defmodule declaration, we bring in the ConsumerSupervisor module logic. Since this is a process, we define the now-familiar start_link/1 function. The state is not relevant for ConsumerSupervisor, so we pass an :ok atom.

The init function

The ...