Search⌘ K
AI Features

Add More Consumers and Buffer Events

Explore how to scale data processing pipelines by adding multiple consumer processes in GenStage. Understand buffering mechanisms, how to configure buffer size, and how to handle event drops. Learn to improve concurrency and load balancing to process data efficiently.

Add more consumers

Rather than working in batches within a single process, we can easily scale our data processing pipeline by running more than one consumer, each responsible for scraping one page at a time. To do this, let’s adjust the demand of PageConsumer:

C++
#scraper/lib/scraper/application.ex
children = [
PageProducer,
Supervisor.child_spec(PageConsumer, id: :consumer_a),
Supervisor.child_spec(PageConsumer, id: :consumer_b)
]

Add a PageProducer to init

Now let’s add a PageProducer to our init function:

C++
#scraper/lib/page_consumer.ex
def init(initial_state) do
Logger.info("PageConsumer init")
sub_opts = [{PageProducer, min_demand: 0, max_demand: 1}]
{:consumer, initial_state, subscribe_to: sub_opts}
end

Now, our consumers take only one event at a time, but we have two consumer processes running concurrently. Once one is free, it will issue the demand to scrape another page.

Notice that when we add another PageConsumer, we use Supervisor.child_spec/2. As we see, each process should have a unique ID in the ...