Add Concurrency With ConsumerSupervisor
Learn how GenStage uses ConsumerSupervisor to achieve concurrency when starting processes.
The properties of ConsumerSupervisor
Using multiple consumers to process events one by one concurrently is a very useful technique. To make this task even easier, GenStage
comes with a special supervisor called ConsumerSupervisor
. It works like a consumer and can subscribe to one or more producers. It can also monitor, start and restart child processes, just like a supervisor.
What makes ConsumerSupervisor
special is that when it receives a list of events from a producer, it automatically starts a process for each event and passes the event as an argument to the process. When a child process exits successfully, a new demand will be issued by ConsumerSupervisor
, and the cycle repeats. This figure illustrates how child processes are started on-demand when new events are received:
Now, we’ll refactor some of our existing logic using ConsumerSupervisor
to demonstrate how it works in practice.
Create a ConsumerSupervisor
We’ll create a new file, page_consumer_supervisor.ex
, and place it in the lib
directory. Here are the entire contents:
#file path -> scraper/lib/page_consumer_supervisor.exdefmodule PageConsumerSupervisor douse ConsumerSupervisorrequire Loggerdef start_link(_args) doConsumerSupervisor.start_link(__MODULE__, :ok)enddef init(:ok) doLogger.info("PageConsumerSupervisor init")children = [%{id: PageConsumer,start: {PageConsumer, :start_link, []},restart: :transient}]opts = [strategy: :one_for_one,subscribe_to: [{PageProducer, max_demand: 2}]]ConsumerSupervisor.init(children, opts)endend
This is a lot of new code, so let’s break it down and explain how it works.
The start_link
function
We name our module PageConsumerSupervisor
, and immediately after the defmodule
declaration, we bring in the ConsumerSupervisor
module logic. Since this is a process, we define the now-familiar start_link/1
function. The state is not relevant for ConsumerSupervisor
, so we pass an :ok
atom.
The init
function
The ...