Add Flow to a GenStage Pipeline
Explore how to enhance a GenStage data-processing pipeline by integrating the Flow library in Elixir. Understand how to rewrite producer-consumer modules using Flow's abstractions, simplify code by removing manual process management, and configure supervision trees to support concurrent workflows. This lesson helps you efficiently manage event flows and improve pipeline clarity and performance.
The scraper project
The scraper project has the following components in the data-processing pipeline:
-
It has one
PageProducerprocess of the:producertype. -
It has two
OnlinePageConsumerProducerprocesses of the:producer_consumertype. -
It has one
PageConsumerSupervisorprocess of the:consumertype. -
It has up to two
PageConsumerprocesses started on demand byPageConsumerSupervisor.
Flow with the GenStage
To demonstrate how Flow works with GenStage, we’ll rewrite the original OnlinePageConsumerProducer implementation using Flow. When working with GenStage, two groups of functions are available to use. The first group is made to work with already running stages:
-
It works with the
from_stages/2stage to receive events from:producerstages. -
It works with the
through_stages/3stage to send events to:producer_consumerstages and receive what they send in turn. -
It works with the
into_stages/3stage to send events to:consumeror:producer_consumerstages.
All functions in this group require a list of the process ids (PIDs) to connect to the already running processes.
The second group of functions is useful when we want Flow ...