gRPC背压流控、压缩及JSON通信【知识笔记】

目录

  1. 一、压缩
  2. 1.Server端所有方法压缩
  3. 2.Server单独方法压缩
  4. 3.Client请求内容压缩
  5. 二、使用JSON通信
  6. 1.方法描述使用JSON编译
  7. 2.JSON编译具体过程
  8. 三、手动流量控制
  9. 1.Consuming Side
  10. 2.Producing Side
  11. 四、系列文章

本文继续整理gRPC的使用,走查解读官方给出的压缩示例、使用JSON通信以及手动流量控制。

一、压缩

1.Server端所有方法压缩
  1. server = ServerBuilder.forPort(port)
  2. .intercept(new ServerInterceptor() {
  3. @Override
  4. public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
  5. ServerCallHandler<ReqT, RespT> next) {
  6. // @1 在拦截器中设置压缩算法
  7. call.setCompression("gzip");
  8. return next.startCall(call, headers);
  9. }
  10. })
  11. .addService(new GreeterImpl())
  12. .build()
  13. .start();

备注:如果需要在Server端所有方法进行压缩,可以在ServerInterceptor拦击器中通过setCompression进行设置。

2.Server单独方法压缩

如果不想对所有的方法传输内容压缩,gPRC提供了单独方法的压缩。

  1. int port = 50051;
  2. server = ServerBuilder.forPort(port)
  3. .addService(new GreeterImpl())
  4. .build()
  5. .start();
  6. static classGreeterImplextendsGreeterGrpc.GreeterImplBase{

  7. @Override
  8. public void sayHello(HelloRequest req, StreamObserver<HelloReply> plainResponseObserver) {
  9. ServerCallStreamObserver<HelloReply> responseObserver =
  10. (ServerCallStreamObserver<HelloReply>) plainResponseObserver;
  11. // @1 对单个方法传输内容进行压缩
  12. responseObserver.setCompression("gzip");
  13. HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
  14. responseObserver.onNext(reply);
  15. responseObserver.onCompleted();
  16. }
  17. }

备注:单个方法的压缩通过ServerCallStreamObserver的setCompression进行单独设置。

3.Client请求内容压缩

客户端对请求内容进行压缩,下面示例通过gzip进行压缩。

  1. publicvoidgreet(String name) {
  2. HelloRequest request = HelloRequest.newBuilder().setName(name).build();
  3. HelloReply response;
  4. try {
  5. // @1 对请求内容设置压缩类型
  6. response = blockingStub.withCompression("gzip").sayHello(request);
  7. } catch (StatusRuntimeException e) {
  8. logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  9. return;
  10. }

二、使用JSON通信

gRPC可以通过Json格式进行通信,虽然并不建议这么做,Json的效率要远低于ProtoBuf。看下示例是如何通过Json格式通信的。

1.方法描述使用JSON编译

对方法的出参和入参使用JSON适配器,示例中通过MethodDescriptor.toBuilder重写出入参数的解析格式。

  1. static final MethodDescriptor<HelloRequest, HelloReply> METHOD_SAY_HELLO =
  2. GreeterGrpc.getSayHelloMethod()
  3. .toBuilder(
  4. // @1 请求参数使用JSON编译 JsonMarshaller.jsonMarshaller(HelloRequest.getDefaultInstance()),
  5. // @2 返回参数使用JSON编译
  6. JsonMarshaller.jsonMarshaller(HelloReply.getDefaultInstance()))
  7. .build();
2.JSON编译具体过程

既然通过对方法的出入参数编译成JSON格式,看下gRPC是如何做的呢?

  1. public static <T extends Message> Marshaller<T> jsonMarshaller(final T defaultInstance) {
  2. final Parser parser = JsonFormat.parser();
  3. final Printer printer = JsonFormat.printer();
  4. return jsonMarshaller(defaultInstance, parser, printer);
  5. }
  6. public static <T extends Message> Marshaller<T> jsonMarshaller(
  7. final T defaultInstance, final Parser parser, final Printer printer) {
  8. final Charset charset = Charset.forName("UTF-8");
  9. return new Marshaller<T>() {
  10. @Override
  11. public InputStream stream(T value) {
  12. try {
  13. // @1 通过printer.print将出入参数转换为JSON格式
  14. return new ByteArrayInputStream(printer.print(value).getBytes(charset));
  15. } catch (InvalidProtocolBufferException e) {
  16. // ...
  17. }
  18. }
  19. // ....
  20. }

备注:在JsonFormat.print方法中进行具体的请求/返回参数转换为JSON的具体实现。

请求转换JSON格式截图

返回转换JSON格式截图

3.Client使用JSON格式的方法描述
  1. public HelloReply sayHello(HelloRequest request) {
  2. // @1 使用JSON格式的方法描述METHOD_SAY_HELLO
  3. return blockingUnaryCall(
  4. getChannel(), METHOD_SAY_HELLO, getCallOptions(), request);
  5. }
4.Server使用JSON格式的方法描述
  1. public ServerServiceDefinition bindService() {
  2. return ServerServiceDefinition
  3. .builder(GreeterGrpc.getServiceDescriptor().getName())
  4. // @1 使用JSON格式的方法描述METHOD_SAY_HELLO
  5. .addMethod(HelloJsonClient.HelloJsonStub.METHOD_SAY_HELLO,
  6. asyncUnaryCall(
  7. new UnaryMethod<HelloRequest, HelloReply>() {
  8. @Override
  9. publicvoidinvoke(
  10. HelloRequest request, StreamObserver<HelloReply> responseObserver) {
  11. sayHello(request, responseObserver);
  12. }
  13. }))
  14. .build();
  15. }

三、手动流量控制

gRPC的流量控制基于HTTP/2的流量控制,即背压模式。关于gRPC和HTTP/2背压模式原理和关系,请看下面摘录。

  1. At the bottom is the HTTP/2's byte-based flow control. HTTP/2 works on streams of bytes and is completely unaware of gRPC messages or reactive streams. By default, the stream consumer allocates a budget of 65536 bytes.
  2. The stream producer can send up to this many bytes before backpressure engages. As the consumer reads bytes, WINDOW_UPDATE messages are sent to the producer to increase its send budget.

  3. In the middle is the gRPC-Java message-based flow control. gRPC's flow control adapts the stream-based flow control of HTTP/2 to a message-based flow control model.
  4. Importantly, gRPC's flow control is aware of how it interacts with HTTP/2 and the network.

  5. On producing side, an on-ready handler reads a message, serializes it into bytes using protobuf, and then queues it up for transmission over the HTTP/2 byte stream.
  6. If there is insuficient room in the HTTP/2 flow control window to transmit, backpressure engages an no more messages are requested from the producer until space becomes available.

  7. On the consuming side, each time a consumer calls request(x), gRPC attempts to read and deserialize x messages from the HTTP/2 stream.
  8. Since the size of a protobuf encoded message is variable, there is not a one-to-one correlation between pulling messages from gRPC and pulling bytes over HTTP/2.

1.Consuming Side
  1. publicstaticvoidmain(String[] args) throws InterruptedException, IOException {
  2. // @1 Server端服务实现
  3. StreamingGreeterGrpc.StreamingGreeterImplBase svc = new StreamingGreeterGrpc.StreamingGreeterImplBase() {
  4. @Override
  5. public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<HelloReply> responseObserver) {
  6. final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
  7. (ServerCallStreamObserver<HelloReply>) responseObserver;
  8. // @2 禁止自动流控模式,开启手动流控
  9. serverCallStreamObserver.disableAutoInboundFlowControl();
  10. // @3 背压模式流控,当消费端有足够空间时将会回调OnReadyHandler
  11. // 默认空间大小为65536字节
  12. classOnReadyHandlerimplementsRunnable{
  13. private boolean wasReady = false;

  14. @Override
  15. publicvoidrun() {
  16. if (serverCallStreamObserver.isReady() && !wasReady) {
  17. wasReady = true;
  18. logger.info("READY");
  19. // @4 向HTTP/2流请求读取并解压(x)条消息
  20. // 即发信号通知发送端发送继续发消息
  21. serverCallStreamObserver.request(1);
  22. }
  23. }
  24. }
  25. final OnReadyHandler onReadyHandler = new OnReadyHandler();
  26. serverCallStreamObserver.setOnReadyHandler(onReadyHandler);
  27. // @5 处理具体进来的请求
  28. return new StreamObserver<HelloRequest>() {
  29. @Override
  30. publicvoidonNext(HelloRequest request) {
  31. try {
  32. String name = request.getName();
  33. logger.info("--> " + name);
  34. Thread.sleep(100);
  35. String message = "Hello " + name;
  36. logger.info("<-- " + message);
  37. HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
  38. // @6 向Client发送请求
  39. responseObserver.onNext(reply);
  40. if (serverCallStreamObserver.isReady()) {
  41. // @7 向HTTP/2流请求读取并解压(x)条消息
  42. serverCallStreamObserver.request(1);
  43. } else {
  44. onReadyHandler.wasReady = false;
  45. }
  46. } catch (Throwable throwable) {
  47. //
  48. }
  49. }
  50. @Override
  51. publicvoidonError(Throwable t) {
  52. t.printStackTrace();
  53. responseObserver.onCompleted();
  54. }
  55. @Override
  56. publicvoidonCompleted() {
  57. logger.info("COMPLETED");
  58. responseObserver.onCompleted();
  59. }
  60. };
  61. }
  62. };

  63. final Server server = ServerBuilder
  64. .forPort(50051)
  65. .addService(svc)
  66. .build()
  67. .start();
2.Producing Side
  1. publicstaticvoidmain(String[] args) throws InterruptedException {
  2. final CountDownLatch done = new CountDownLatch(1);
  3. ManagedChannel channel = ManagedChannelBuilder
  4. .forAddress("localhost", 50051)
  5. .usePlaintext()
  6. .build();
  7. StreamingGreeterGrpc.StreamingGreeterStub stub = StreamingGreeterGrpc.newStub(channel);

  8. ClientResponseObserver<HelloRequest, HelloReply> clientResponseObserver =
  9. new ClientResponseObserver<HelloRequest, HelloReply>() {
  10. ClientCallStreamObserver<HelloRequest> requestStream;
  11. @Override
  12. publicvoidbeforeStart(final ClientCallStreamObserver<HelloRequest> requestStream) {
  13. this.requestStream = requestStream;
  14. // @1设置手动流量控制
  15. requestStream.disableAutoInboundFlowControl();
  16. // @2 当Consumer端有足够空间时自动回调
  17. // 序列化protobuf先发送到缓存区(还未到Server端)
  18. // Server端需要调用request()向Client拉取消息
  19. requestStream.setOnReadyHandler(new Runnable() {
  20. Iterator<String> iterator = names().iterator();
  21. @Override
  22. publicvoidrun() {
  23. while (requestStream.isReady()) {
  24. if (iterator.hasNext()) {
  25. String name = iterator.next();
  26. logger.info("--> " + name);
  27. HelloRequest request = HelloRequest.newBuilder().setName(name).build();
  28. // @3 将消息发送到缓存区
  29. requestStream.onNext(request);
  30. } else {
  31. // @4 标记Client发送完成
  32. requestStream.onCompleted();
  33. }
  34. }
  35. }
  36. });
  37. }

  38. @Override
  39. publicvoidonNext(HelloReply value) {
  40. // @5 接受Server端返回信息
  41. logger.info("<-- " + value.getMessage());
  42. // @6 通知Client继续发送
  43. requestStream.request(1);
  44. }

  45. @Override
  46. publicvoidonError(Throwable t) {
  47. t.printStackTrace();
  48. done.countDown();
  49. }

  50. @Override
  51. publicvoidonCompleted() {
  52. logger.info("All Done");
  53. done.countDown();
  54. }
  55. };
  56. stream processing.
  57. stub.sayHelloStreaming(clientResponseObserver);

  58. done.await();

  59. channel.shutdown();
  60. channel.awaitTermination(1, TimeUnit.SECONDS);
  61. }

四、系列文章

gRPC示例初探【实战笔记】

gRPC四种类型示例分析【知识笔记】

gRPC中Header传值与错误拦截处理【知识笔记】


「瓜农老梁  学习同行」

    

(0)

相关推荐