Search⌘ K
AI Features

Adding a FluxProcessor

Explore how to implement a FluxProcessor within Spring Boot to manage RSocket messaging patterns such as request/response, fire-and-forget, and monitor channels. Understand reactive handling of item processing, saving to MongoDB, and broadcasting to subscribers using reactive streams.

In this lesson, we’ll learn about different exchange types in RSocket, such as request/response, fire-and-forget, and monitor channel exchanges. This will also lay the basis for our RSocket.

Handling a request/response RSocket exchange

First, we need to handle an RSocket request/response. The following methods, added to RSocketService, should illustrate how both the itemProcessor and itemSink can be put to good use:

Java
@MessageMapping("newItems.request-response") //1
public Mono<Item> processNewItemsViaRSocketRequestResponse(Item item) { //2
return this.repository.save(item) //3
.doOnNext(savedItem -> this.itemSink.next(savedItem)); //4
}
Handling a request-response RSocket exchange

Here’s a breakdown of the code above:

  1. In line 1, Spring Messaging’s @MessageMapping annotation is used to route RSocket messages with a destination of newItems.request-response.

  2. In line 2, Spring Messaging listens reactively for these messages, then invokes the method with the payload. Notice that the return type is a Reactor type around the domain object, Item. This is the signature of the response that’s expected.

  3. In line 3, we save Item to MongoDB using a reactive repository.

  4. In line 4, .doOnNext() grabs the newly saved Item and ...