Configure the Scraping Pipeline

Learn to configure the broadway pipeline and use the handle_message function.

Pipeline configuration

We’ll use the processors of Broadway to refactor the logic that checks each website. For this, we have to define :processors in start_link/1, and use handle_message/3:

def start_link(_args) do
  options = [
    name: ScrapingPipeline,
    producer: [
      module: {PageProducer, []},
      transformer: {ScrapingPipeline, :transform, []}
    ],
    processors: [
      default: [max_demand: 1, concurrency: 2]
    ]
  ]

  Broadway.start_link(__MODULE__, options)
end

def handle_message(_processor, message, _context) do
  if Scraper.online?(message.data) do
    # To do...
  else
    Broadway.Message.failed(message, "offline")
  end
end

We can discard offline websites using Broadway.Message.failed/2. Successful messages go to the next step, which would do the scraping work.

Define a batcher and batch processor

This is where batchers from Broadway come in handy. To maintain our previous logic, we define a batcher with :batch_size set to 1, and two batch processors in the following way:

Get hands-on with 1200+ tech skills courses.