Server-Side Implementation of Client Streaming RPC
Write the service implementation of the addFile method.
We'll cover the following...
In this lesson we show the server side implementation of the client streaming addFile RPC. The server side is implemented in the ftp-service module. Open the FTPService class from the src/main/java/io/datajek/ftpservice/server package. This class implements the FTP service defined in the proto file. In the FTPService class defined in Setting up a gRPC Service lesson, we created the addFile method as follows:
@Overridepublic StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {StreamObserver<AddFileRequest> requestObserver= null;return requestObserver;}
addFile method in FTPService class
The addFile method handles the streaming of file data from the client to the server and adds a file to the server’s destination directory. The files are first written to the /tmp directory and then atomically moved to the destination directory. The signature of the addFile RPC in the ftp_service.proto file is shown below:
// The FTP service definition.service FTPService {// Deletes a file in the destination directory of the serverrpc deleteFile (MetaData) returns (Result) {};// Adds a file to the destination directory of the server.rpc addFile(stream AddFileRequest) returns(Result){};}
When the protoc compiler generates Java code for the above service, it creates a stub method for addFile which we need to override in the FTPService class.
@Overridepublic StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {}
If we compare the signature of the addFile() method in the ftp_service.proto file with the one shown above, there’s a noticeable difference.
Since the client can send multiple messages to the service, the service implementation returns a
StreamObserver<AddFileRequest>instance. The server will process client requests using the provided observer. It is invoked whenever additional messages are received from the client.StreamObserver<Result>is used by the server to send response back to the client. It’s provided by the client when calling theaddFilemethod. The service implementation invokes theonNext()method on this observer to send a response. TheResultmessage contains the outcomes of the server’s operations in response to the client’s requests.
Here AddFileRequestResultftp-service.proto file. These message types represent the structured data being sent between the client and server.
The addFile method handles AddFileRequest messages and returns a new StreamObserver<AddFileRequest> object. First, we will log a message indicating that a request to add a file has been received. The StreamObserver implementation has three methods: onNext, onError, and onCompleted. These methods are called by the gRPC framework based on the events received during the streaming process.
The
StreamObserver’sonNextmethod will be called every time client makes a streaming request. Here, we will process the file metadata and store file chunks in a temporary location.The
StreamObserver’sonErrormethod will be invoked when a client sends an error.The
StreamObserver’sonCompletedmethod will be called when a client completes the streaming call. In this method, we will move the file to the server's destination directory.
We will override these methods and write custom logic to handle requests for our FTP service.
@Overridepublic StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {logger.info("Received request to add file.");return new StreamObserver<AddFileRequest>() {@Overridepublic void onNext(AddFileRequest addFileRequest) {// called whenever client sends a message}@Overridepublic void onError(Throwable throwable) {// called when an error is received from the client}@Overridepublic void onCompleted() {// called when client is done sending request messages}};}
The addFile method handles the streaming of file data from the client to the server. It processes each AddFileRequest, validates the message, verifies data integrity by checksum matching, writes file chunks to the temporary location, and manages the completion of the file transfer. Along the process, it keeps ...