...

/

Server-Side Implementation of Client Streaming RPC

Server-Side Implementation of Client Streaming RPC

Write the service implementation of the addFile method.

In this lesson we show the server side implementation of the client streaming addFile RPC. The server side is implemented in the ftp-service module. Open the FTPService class from the src/main/java/io/datajek/ftpservice/server package. This class implements the FTP service defined in the proto file. In the FTPService class defined in Setting up a gRPC Service lesson, we created the addFile method as follows:

@Override
public StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {
StreamObserver<AddFileRequest> requestObserver= null;
return requestObserver;
}
addFile method in the FTPService class

addFile method in FTPService class

The addFile method handles the streaming of file data from the client to the server and adds a file to the server’s destination directory. The files are first written to the /tmp directory and then atomically moved to the destination directory. The signature of the addFile RPC in the ftp_service.proto file is shown below:

// The FTP service definition.
service FTPService {
// Deletes a file in the destination directory of the server
rpc deleteFile (MetaData) returns (Result) {};
// Adds a file to the destination directory of the server.
rpc addFile(stream AddFileRequest) returns(Result){};
}
addFile RPC definition in proto file

When the protoc compiler generates Java code for the above service, it creates a stub method for addFile which we need to override in the FTPService class.

@Override
public StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {
}
addFile service stub

If we compare the signature of the addFile() method in the ftp_service.proto file with the one shown above, there’s a noticeable difference.

  • Since the client can send multiple messages to the service, the service implementation returns a StreamObserver<AddFileRequest> instance. The server will process client requests using the provided observer. It is invoked whenever additional messages are received from the client.

  • StreamObserver<Result> is used by the server to send response back to the client. It’s provided by the client when calling the addFile method. The service implementation invokes the onNext() method on this observer to send a response. The Result message contains the outcomes of the server’s operations in response to the client’s requests.

Here AddFileRequestAddFileRequest_message and ResultResult_message are defined as protocol buffer message types defined in the ftp-service.proto file. These message types represent the structured data being sent between the client and server.

The addFile method handles AddFileRequest messages and returns a new StreamObserver<AddFileRequest> object. First, we will log a message indicating that a request to add a file has been received. The StreamObserver implementation has three methods: onNext, onError, and onCompleted. These methods are called by the gRPC framework based on the events received during the streaming process.

  1. The StreamObserver’s onNext method will be called every time client makes a streaming request. Here, we will process the file metadata and store file chunks in a temporary location.

  2. The StreamObserver’s onError method will be invoked when a client sends an error.

  3. The StreamObserver’s onCompleted method will be called when a client completes the streaming call. In this method, we will move the file to the server's destination directory.

We will override these methods and write custom logic to handle requests for our FTP service.

@Override
public StreamObserver<AddFileRequest> addFile(final StreamObserver<Result> resultObserver) {
logger.info("Received request to add file.");
return new StreamObserver<AddFileRequest>() {
@Override
public void onNext(AddFileRequest addFileRequest) {
// called whenever client sends a message
}
@Override
public void onError(Throwable throwable) {
// called when an error is received from the client
}
@Override
public void onCompleted() {
// called when client is done sending request messages
}
};
}
addFile method implementation

The addFile method handles the streaming of file data from the client to the server. It processes each AddFileRequest, validates the message, verifies data integrity by checksum matching, writes file chunks to the temporary location, and manages the completion of the file transfer. Along the process, it keeps ...