/*
 * Decompiled with CFR 0.152.
 */
package com.twosigma.waiter.courier;

import com.twosigma.waiter.courier.CourierGrpc;
import com.twosigma.waiter.courier.CourierReply;
import com.twosigma.waiter.courier.CourierRequest;
import com.twosigma.waiter.courier.CourierSummary;
import io.grpc.BindableService;
import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Set;
import java.util.logging.Logger;

public class GrpcServer {
    private static final Logger LOGGER = Logger.getLogger(GrpcServer.class.getName());
    private Server server;

    void start(int port) throws IOException {
        LOGGER.info("starting gRPC server on port " + port);
        this.server = ServerBuilder.forPort((int)port).addService((BindableService)new CourierImpl()).intercept((ServerInterceptor)new GrpcServerInterceptor()).build().start();
        LOGGER.info("gRPC server started, listening on " + port);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            this.stop();
            System.err.println("*** server shut down");
        }));
    }

    private void stop() {
        if (this.server != null) {
            this.server.shutdown();
        }
    }

    void blockUntilShutdown() throws InterruptedException {
        if (this.server != null) {
            this.server.awaitTermination();
        }
    }

    private static void sleepOnRequest(CourierRequest courierRequest) {
        try {
            long sleepDuration = courierRequest.getSleepDuration();
            if (sleepDuration > 0L) {
                LOGGER.info("Sleeping on request for " + sleepDuration + " ms");
                Thread.sleep(sleepDuration);
            }
        }
        catch (InterruptedException e) {
            LOGGER.severe("Error while sleeping on request: " + e.getMessage());
        }
    }

    private static class GrpcServerInterceptor
    implements ServerInterceptor {
        private GrpcServerInterceptor() {
        }

        public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, final Metadata requestMetadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
            this.logMetadata(requestMetadata, "request");
            final Metadata.Key xCidKey = Metadata.Key.of((String)"x-cid", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER);
            final String correlationId = (String)requestMetadata.get(xCidKey);
            ForwardingServerCall.SimpleForwardingServerCall wrapperCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall){

                public void sendHeaders(Metadata responseHeaders) {
                    this.logMetadata(requestMetadata, "response");
                    if (correlationId != null) {
                        LOGGER.info("response linked to cid: " + correlationId);
                        responseHeaders.put(xCidKey, (Object)correlationId);
                    }
                    super.sendHeaders(responseHeaders);
                }
            };
            return serverCallHandler.startCall((ServerCall)wrapperCall, requestMetadata);
        }

        private void logMetadata(Metadata metadata, String label) {
            Set metadataKeys = metadata.keys();
            LOGGER.info(label + " metadata keys = " + metadataKeys);
            for (String key : metadataKeys) {
                String value = (String)metadata.get(Metadata.Key.of((String)key, (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER));
                LOGGER.info(label + " metadata " + key + " = " + value);
            }
        }
    }

    private static class CourierImpl
    extends CourierGrpc.CourierImplBase {
        private CourierImpl() {
        }

        @Override
        public void sendPackage(CourierRequest request, StreamObserver<CourierReply> responseObserver) {
            LOGGER.info("received CourierRequest{id=" + request.getId() + ", from=" + request.getFrom() + ", message.length=" + request.getMessage().length() + "}");
            GrpcServer.sleepOnRequest(request);
            CourierReply reply = CourierReply.newBuilder().setId(request.getId()).setMessage(request.getMessage()).setResponse("received").build();
            LOGGER.info("Sending CourierReply for id=" + reply.getId());
            responseObserver.onNext((Object)reply);
            responseObserver.onCompleted();
        }

        @Override
        public StreamObserver<CourierRequest> collectPackages(final StreamObserver<CourierSummary> responseObserver) {
            return new StreamObserver<CourierRequest>(){
                private long numMessages = 0L;
                private long totalLength = 0L;

                public void onNext(CourierRequest request) {
                    LOGGER.info("Received CourierRequest id=" + request.getId());
                    GrpcServer.sleepOnRequest(request);
                    ++this.numMessages;
                    this.totalLength += (long)request.getMessage().length();
                    CourierSummary courierSummary = CourierSummary.newBuilder().setNumMessages(this.numMessages).setTotalLength(this.totalLength).build();
                    LOGGER.info("Sending CourierSummary for id=" + request.getId());
                    responseObserver.onNext((Object)courierSummary);
                }

                public void onError(Throwable throwable) {
                    LOGGER.severe("Error in collecting packages" + throwable.getMessage());
                    responseObserver.onError(throwable);
                }

                public void onCompleted() {
                    LOGGER.severe("Completed collecting packages");
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

