Search⌘ K
AI Features

Add Concurrency With ConsumerSupervisor

Explore how to enhance concurrency in Elixir data pipelines using ConsumerSupervisor. Learn to automatically start and supervise child processes for events, simplify consumer logic, and scale processing with configurable demand.

The properties of ConsumerSupervisor

Using multiple consumers to process events one by one concurrently is a very useful technique. To make this task even easier, GenStage comes with a special supervisor called ConsumerSupervisor. It works like a consumer and can subscribe to one or more producers. It can also monitor, start and restart child processes, just like a supervisor.

What makes ConsumerSupervisor special is that when it receives a list of events from a producer, it automatically starts a process for each event and passes the event as an argument to the process. When a child process exits successfully, a new demand will be issued by ConsumerSupervisor, and the cycle repeats. This figure illustrates how child processes are started on-demand when new events are received:

Now, we’ll refactor some of our existing logic using ConsumerSupervisor to demonstrate how it works in practice.

Create a ConsumerSupervisor

We’ll create a new file, ...

C++
#file path -> scraper/lib/page_consumer_supervisor.ex
defmodule PageConsumerSupervisor do
use ConsumerSupervisor
require Logger
def start_link(_args) do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
Logger.info("PageConsumerSupervisor init")
children = [
%{
id: PageConsumer,
start: {PageConsumer, :start_link, []},
restart: :transient
}
]
opts = [
strategy: :one_for_one,
subscribe_to: [
{PageProducer, max_demand: 2}
]
]
ConsumerSupervisor.init(children, opts)
end
end
...