Adding a FluxProcessor
Explore how to implement a FluxProcessor within Spring Boot to manage RSocket messaging patterns such as request/response, fire-and-forget, and monitor channels. Understand reactive handling of item processing, saving to MongoDB, and broadcasting to subscribers using reactive streams.
In this lesson, we’ll learn about different exchange types in RSocket, such as request/response, fire-and-forget, and monitor channel exchanges. This will also lay the basis for our RSocket.
Handling a request/response RSocket exchange
First, we need to handle an RSocket request/response.
The following methods, added to RSocketService, should illustrate how both the
itemProcessor and itemSink can be put to good use:
Here’s a breakdown of the code above:
-
In line 1, Spring Messaging’s
@MessageMappingannotation is used to route RSocket messages with a destination ofnewItems.request-response. -
In line 2, Spring Messaging listens reactively for these messages, then invokes the method with the payload. Notice that the return type is a Reactor type around the domain object,
Item. This is the signature of the response that’s expected. -
In line 3, we save
Itemto MongoDB using a reactive repository. -
In line 4,
.doOnNext()grabs the newly savedItemand ...