Implement handle_message/3

Learn to process incoming messages through the handle_message callback function.

Callback function handle_message/3

Incoming messages sent by the broker are processed by handle_message/3. Within this callback, we can look at the given message and its data. We can use it to perform any work.

The handle_message/3 callback is special. All callbacks that we’ve seen before when using GenServer and GenStage run within the current process. However, code written in handle_message/3 is executed by a processor. Processors concurrently run the processes, which are started by Broadway to perform the work in handle_message/3. They also isolate any potential errors. The pipeline is restarted and quickly brought back to a working state if an exception happens.

There are three arguments given to handle_message/3:

  • We have the current processor group, which is the :default atom for now.

  • We have the message itself, in the form of a %Broadway.Message{} struct.

  • We have the context value, which is optional and available to set in start_link/2.

The callback must return a %Broadway.Message{} struct. We’ll see what this struct contains in just a moment.

Let’s implement handle_message/3 in the following way:

Get hands-on with 1200+ tech skills courses.