Server-Side Implementation of Client Streaming RPC
Write the service implementation of the addFile method.
In this lesson we show the server side implementation of the client streaming addFile
RPC. The server side is implemented in the ftp-service
module. Open the FTPService
class from the src/main/java/io/datajek/ftpservice/server
package. This class implements the FTP service defined in the proto file. In the FTPService
class defined in Setting up a gRPC Service lesson, we created the addFile
method as follows:
@Overridepublic StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {StreamObserver<AddFileRequest> requestObserver= null;return requestObserver;}
addFile
method in FTPService
class
The addFile
method handles the streaming of file data from the client to the server and adds a file to the server’s destination directory. The files are first written to the /tmp
directory and then atomically moved to the destination directory. The signature of the addFile
RPC in the ftp_service.proto
file is shown below:
// The FTP service definition.service FTPService {// Deletes a file in the destination directory of the serverrpc deleteFile (MetaData) returns (Result) {};// Adds a file to the destination directory of the server.rpc addFile(stream AddFileRequest) returns(Result){};}
When the protoc
compiler generates Java code for the above service, it creates a stub method for addFile
which we need to override in the FTPService
class.
@Overridepublic StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {}
If we compare the signature of the addFile()
method in the ftp_service.proto
file with the one shown above, there’s a noticeable difference.
Since the client can send multiple messages to the service, the service implementation returns a
StreamObserver<AddFileRequest>
instance. The server will process client requests using the provided observer. It is invoked whenever additional messages are received from the client.StreamObserver<Result>
is used by the server to send response back to the client. It’s provided by the client when calling theaddFile
method. The service implementation invokes theonNext()
method on this observer to send a response. TheResult
message contains the outcomes of the server’s operations in response to the client’s requests.
Here AddFileRequest
Result
ftp-service.proto
file. These message types represent the structured data being sent between the client and server.
The addFile
method handles AddFileRequest
messages and returns a new StreamObserver<AddFileRequest>
object. First, we will log a message indicating that a request to add a file has been received. The StreamObserver
implementation has three methods: onNext
, onError
, and onCompleted
. These methods are called by the gRPC framework based on the events received during the streaming process.
The
StreamObserver
’sonNext
method will be called every time client makes a streaming request. Here, we will process the file metadata and store file chunks in a temporary location.The
StreamObserver
’sonError
method will be invoked when a client sends an error.The
StreamObserver
’sonCompleted
method will be called when a client completes the streaming call. In this method, we will move the file to the server's destination directory.
We will override these methods and write custom logic to handle requests for our FTP service.
@Overridepublic StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {logger.info("Received request to add file.");return new StreamObserver<AddFileRequest>() {@Overridepublic void onNext(AddFileRequest addFileRequest) {// called whenever client sends a message}@Overridepublic void onError(Throwable throwable) {// called when an error is received from the client}@Overridepublic void onCompleted() {// called when client is done sending request messages}};}
The addFile
method handles the streaming of file data from the client to the server. It processes each AddFileRequest
, validates the message, verifies data integrity by checksum matching, writes file chunks to the temporary location, and manages the completion of the file transfer. Along the process, it keeps ...