Implement handle_message/3
Explore how to implement the handle_message 3 callback in Broadway to process incoming messages concurrently. Understand its role in data ingestion, error isolation, and integrating business logic with Elixir's concurrency tools.
We'll cover the following...
Callback function handle_message/3
Incoming messages sent by the broker are processed by handle_message/3. Within this callback, we can look at the given message and its data. We can use it to perform any work.
The handle_message/3 callback is special. All callbacks that we’ve seen before when using GenServer and GenStage run within the current process. However, code written in handle_message/3 is executed by a processor. Processors concurrently run the processes, which are started by Broadway to perform the work in handle_message/3. They also isolate any potential errors. The pipeline is restarted and quickly brought back to a working state if an exception happens.
There are three arguments given to handle_message/3:
-
We have the current processor group, which is the
:defaultatom for now. -
We have the message itself, in the form of a
%Broadway.Message{}struct. ...