[gRPC] – gRPC Client Streaming

Với framework gRPC, chúng ta có thể gửi nhiều messages giữa Client và Server thông qua một kết nối TCP duy nhất. Nó được gọi là Multiplexing. Trong gRPC client streaming, Client có thể gửi nhiều request đến Server. Sau khi Client xác nhận rằng nó đã gửi tất cả các request, Server sẽ gửi

Với framework gRPC, chúng ta có thể gửi nhiều messages giữa Client và Server thông qua một kết nối TCP duy nhất. Nó được gọi là Multiplexing.

Trong gRPC client streaming, Client có thể gửi nhiều request đến Server. Sau khi Client xác nhận rằng nó đã gửi tất cả các request, Server sẽ gửi lại một response duy nhất cho Client.

Một case study ví dụ như chức năng upload file, trong đó Client upload một file lớn bằng cách chia thành nhiều phần nhỏ.

Sample Application

Để mọi thứ đơn giản, chúng ta sẽ giả sử rằng Client gửi nhiều số đến Server, Server sẽ gửi kết quả là tổng của tất cả các số khi Client thực hiện xong tất cả các request của nó.

Protobuf – Service Definition

Khi biết nghiệp vụ, request và response cần là gì, chúng ta sẽ định nghĩa service xử lý nghiệp vụ cho chúng. Phương thức sumAll được implements ở phía Server nhận kiểu dữ liệu đầu vào và trả kiểu dữ liệu đầu ra theo như mong đợi. Chúng ta sử dụng từ khóa stream để chỉ ra rằng đó sẽ là client side streaming request. File .proto định nghĩa service trong ví dụ này như sau:

syntax = "proto3";

package calculator;

option java_package = "example.calculator";
option java_multiple_files = true;

message NumberRequest {
  int32 number = 1;
}

message NumberResponse {
  int64 result = 1;
}

service CalculatorService {
  // client stream
  rpc sumAll(stream NumberRequest) returns (NumberResponse) {};
}

Khi chúng ta chạy lệnh maven dưới đây, maven sẽ tự động tạo code cho client application và server application bằng công cụ protoc.

mvn clean compile

Class CalculatorServiceImplBase là abstract class được tạo tự động khi gen code cần được phía Server implements. Tương tự CalculatorServiceStub là class được tạo tự động khi gen code mà phía client application sử dụng để gửi yêu cầu đến server.

gRPC Client Streaming – Server Side

Service Implementation: class này kế thừa abstract class CalculatorServiceImplBase để implement phương thức sumAll (triển khai nghiệp vụ) và phản hồi lại cho request gọi phương thức sumAll ở phía Client. Khi Client gọi phương thức onCompleted, nó sẽ thông báo cho Server rằng nó đã gửi hết request. Lúc đó Server sẽ thực hiện gửi lại response.

  • Success response: Server tính tổng cho tất cả các số mà Client gửi đến và response lại bằng cách sử dụng responseObserver. Dữ liệu trả về thông qua phương thức onNext tới Client đang gọi và cũng thông báo cho client rằng server đã hoàn thành công việc bằng cách gọi phương thức onCompleted.
  • Error response: Trong trường hợp có bất kỳ lỗi nào trong quá trình xử lý, server sẽ sử dụng phương thức onError để thông báo cho client. Chúng ta sẽ thảo luận về điều cách xử lý lỗi trong một bài viết khác.
public class NumberRequestObserver implements StreamObserver<NumberRequest> {

    private int sum = 0;
    private final StreamObserver<NumberResponse> responseObserver;

    public NumberRequestObserver(StreamObserver<NumberResponse> responseObserver) {
        this.responseObserver = responseObserver;
    }

    @Override
    public void onNext(NumberRequest numberRequest) {
        System.out.println("=========== client onNext ===========");
        System.out.println("numberRequest: " + numberRequest.getNumber());
        sum = sum + numberRequest.getNumber();
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("=========== client onError ===========");
        throwable.getMessage();
        throwable.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("=========== client onCompleted ===========");
        NumberResponse response = NumberResponse.newBuilder().setResult(sum).build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

}
public class CalculatorService extends CalculatorServiceGrpc.CalculatorServiceImplBase {

    @Override
    public StreamObserver<NumberRequest> sumAll(StreamObserver<NumberResponse> responseObserver) {
        return new NumberRequestObserver(responseObserver);
    }
}

Sau khi implements xong, chúng ta cần start gRPC server để cung cấp dịch vụ cho Client.

public class CalculatorGrpcServer {

    public static void main(String[] args) throws IOException, InterruptedException {

        // build gRPC server
        Server server = ServerBuilder.forPort(6565)
                .addService(new CalculatorService())
                .build();

        // start
        server.start();

        // shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("gRPC server is shutting down!");
            server.shutdown();
        }));

        server.awaitTermination();

    }

}

Bây giờ thì gRPC server đã sẵn sàng. Chúng ta sẽ qua Client Side.

gRPC Client Streaming – Client Side

Trên phía Client chúng ta cần thực hiện các bước sau để gửi request và nhận lại response. Bước đầu tiên để thực hiện gửi request, chúng ta cần có một implementation của StreamObserver. Điều này là để in giá trị kết quả trả về khi Client nhận được phản hồi từ Server.

public class NumberResponseObserver implements StreamObserver<NumberResponse> {

    @Override
    public void onNext(NumberResponse response) {
        System.out.println("=========== server onNext ===========");
        System.out.println("Result : " + response.getResult());
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("=========== server onError ===========");
        throwable.getMessage();
        throwable.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("=========== server onCompleted ===========");
    }

}

Tiếp theo là tạo kết nối và gửi request:

  • Tạo kênh (create channel): client apllication phải tạo một kênh kết nối với gRPC server.
  • Stub: Client apllication sẽ sử dụng non-blocking stub để thực hiện truyền tham số và gửi request.

Trong ví dụ này chúng ta sẽ tạo một class JUnit để hoạt động như một gRPC client. Hãy lưu ý rằng client application (hay gRPC client) có thể là bất cứ ngôn ngữ gì. Nó thậm chí có thể là một microservice khác.

public class ClientStreamingTest {

    private ManagedChannel channel;
    private CalculatorServiceGrpc.CalculatorServiceStub clientStub;

    @Before
    public void setup(){
        this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();
        this.clientStub = CalculatorServiceGrpc.newStub(channel);
    }

    @Test
    public void clientStreamingSumTest() throws InterruptedException {

        // pass the output stream observer & receive the input stream observer
        StreamObserver<NumberRequest> inputStreamObserver = this.clientStub.sumAll(new NumberResponseObserver());

        for (int i = 0; i <= 100; i++) {
            // build the request object
            NumberRequest input = NumberRequest.newBuilder()
                    .setNumber(i)
                    .build();
            // pass the request object via input stream observer
            inputStreamObserver.onNext(input);
        }

        // client side is done. this method make the server respond with the sum value
        inputStreamObserver.onCompleted();

        // just for testing
        Thread.sleep(3000);
    }

    @After
    public void teardown(){
        this.channel.shutdown();
    }

}

Bây giờ chúng ta đã sẵn sàng để gửi request và nhận response từ Server. Client sẽ gửi 100 số từ 1 đến 100 đến Server. Khi nó gọi onCompleted, Server gửi về response với giá trị bằng tổng của 100 số gửi đến và nó được in trên console phía Client.

=========== server onNext ===========
Result : 5050
=========== server onCompleted ===========

Tổng kết

Vậy là chúng ta vừa làm quen cách tạo một ứng dụng gRPC Client Streaming. Hi vọng bài viết hữu ích với mọi người.

Nguồn:https://thenewstack.wordpress.com/2021/11/23/grpc-grpc-client-streaming/

Follow me: thenewstack.wordpress.com

Nguồn: viblo.asia

Bài viết liên quan

Sự Khác Nhau Giữa Domain và Hosting Là Gì?

Sự khác nhau giữa domain và hosting là gì? Bài này giải thích ngắn và dễ hiểu nh

Shared Hosting hay VPS Hosting: Lựa chọn nào dành cho bạn?

Bài viết giải thích rõ shared hosting và vps hosting là gì và hướng dẫn chọn lựa

Thay đổi Package Name của Android Studio dể dàng với plugin APR

Nếu bạn đang gặp khó khăn hoặc bế tắc trong việc thay đổi package name trong And

Lỗi không Update Meta_Value Khi thay thế hình ảnh cũ bằng hình ảnh mới trong WordPress

Mã dưới đây hoạt động tốt có 1 lỗi không update được postmeta ” meta_key=