/*
 * Decompiled with CFR 0.152.
 */
package com.twosigma.waiter.courier;

import com.twosigma.waiter.courier.CourierGrpc;
import com.twosigma.waiter.courier.CourierReply;
import com.twosigma.waiter.courier.CourierRequest;
import com.twosigma.waiter.courier.CourierSummary;
import com.twosigma.waiter.courier.GrpcServer;
import com.twosigma.waiter.courier.Variant;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class GrpcClient {
    private static final Logger LOGGER = Logger.getLogger(GrpcServer.class.getName());
    private final Function<String, Void> logFunction;
    private final String host;
    private final int port;

    private static Variant retrieveVariant(String id) {
        if (id.contains("SEND_ERROR")) {
            return Variant.SEND_ERROR;
        }
        if (id.contains("EXIT_PRE_RESPONSE")) {
            return Variant.EXIT_PRE_RESPONSE;
        }
        if (id.contains("EXIT_POST_RESPONSE")) {
            return Variant.EXIT_POST_RESPONSE;
        }
        return Variant.NORMAL;
    }

    public GrpcClient(String host, int port) {
        this(host, port, new Function<String, Void>(){

            @Override
            public Void apply(String message) {
                LOGGER.info(message);
                return null;
            }
        });
    }

    public GrpcClient(String host, int port, Function<String, Void> logFunction) {
        this.host = host;
        this.port = port;
        this.logFunction = logFunction;
    }

    private ManagedChannel initializeChannel() {
        this.logFunction.apply("initializing plaintext client at " + this.host + ":" + this.port);
        return ManagedChannelBuilder.forAddress((String)this.host, (int)this.port).usePlaintext().build();
    }

    private void shutdownChannel(ManagedChannel channel) {
        this.logFunction.apply("shutting down channel");
        try {
            channel.shutdown().awaitTermination(1L, TimeUnit.SECONDS);
            if (channel.isShutdown()) {
                this.logFunction.apply("channel shutdown successfully");
            } else {
                this.logFunction.apply("channel shutdown timed out!");
            }
        }
        catch (Exception ex) {
            this.logFunction.apply("error in channel shutdown: " + ex.getMessage());
        }
    }

    private Metadata createRequestHeadersMetadata(Map<String, Object> headers) {
        Metadata headerMetadata = new Metadata();
        for (Map.Entry<String, Object> entry : headers.entrySet()) {
            String key = entry.getKey();
            String value = String.valueOf(entry.getValue());
            headerMetadata.put(Metadata.Key.of((String)key, (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER), (Object)value);
        }
        return headerMetadata;
    }

    private Channel wrapResponseLogger(ManagedChannel channel) {
        return ClientInterceptors.intercept((Channel)channel, (ClientInterceptor[])new ClientInterceptor[]{new ClientInterceptor(){

            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
                return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)){

                    public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
                        super.start((ClientCall.Listener)new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener){

                            public void onHeaders(Metadata headers) {
                                GrpcClient.this.logFunction.apply("headers received from server:" + headers);
                                super.onHeaders(headers);
                            }

                            public void onClose(Status status, Metadata trailers) {
                                GrpcClient.this.logFunction.apply("status received from server:" + status);
                                GrpcClient.this.logFunction.apply("trailers received from server:" + trailers);
                                super.onClose(status, trailers);
                            }
                        }, headers);
                    }
                };
            }
        }});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RpcResult<CourierReply> sendPackage(Map<String, Object> headers, String id, String from, String message) {
        ManagedChannel channel = this.initializeChannel();
        try {
            CourierReply reply;
            Channel wrappedChannel = this.wrapResponseLogger(channel);
            Metadata headerMetadata = this.createRequestHeadersMetadata(headers);
            CourierGrpc.CourierFutureStub rawStub = CourierGrpc.newFutureStub(wrappedChannel);
            CourierGrpc.CourierFutureStub futureStub = (CourierGrpc.CourierFutureStub)MetadataUtils.attachHeaders((AbstractStub)rawStub, (Metadata)headerMetadata);
            this.logFunction.apply("will try to send package from " + from + " ...");
            CourierRequest request = CourierRequest.newBuilder().setId(id).setFrom(from).setMessage(message).setVariant(GrpcClient.retrieveVariant(id)).build();
            AtomicReference<Status> status = new AtomicReference<Status>();
            AtomicReference<CourierReply> response = new AtomicReference<CourierReply>();
            try {
                reply = (CourierReply)futureStub.sendPackage(request).get();
                status.set(Status.OK);
                response.set(reply);
            }
            catch (StatusRuntimeException ex) {
                Status errorStatus = ex.getStatus();
                this.logFunction.apply("RPC failed, status: " + errorStatus);
                status.set(errorStatus);
            }
            catch (ExecutionException ex) {
                Status errorStatus = Status.fromThrowable((Throwable)ex.getCause());
                this.logFunction.apply("RPC execution failed: " + errorStatus);
                status.set(errorStatus);
            }
            catch (Throwable th) {
                this.logFunction.apply("RPC failed, message: " + th.getMessage());
                status.set(Status.UNKNOWN.withDescription(th.getMessage()));
            }
            if (response.get() != null) {
                reply = (CourierReply)response.get();
                this.logFunction.apply("received response CourierReply{id=" + reply.getId() + ", response=" + reply.getResponse() + ", message.length=" + reply.getMessage().length() + "}");
                this.logFunction.apply("messages equal = " + message.equals(reply.getMessage()));
            }
            RpcResult<CourierReply> rpcResult = new RpcResult<CourierReply>(response.get(), (Status)status.get());
            return rpcResult;
        }
        finally {
            this.shutdownChannel(channel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RpcResult<List<CourierSummary>> collectPackages(Map<String, Object> headers, List<String> ids, String from, List<String> messages, int interMessageSleepMs, final boolean lockStepMode, int cancelThreshold) {
        ManagedChannel channel = this.initializeChannel();
        AtomicBoolean awaitChannelTermination = new AtomicBoolean(true);
        try {
            final Semaphore lockStep = new Semaphore(1);
            final AtomicBoolean errorSignal = new AtomicBoolean(false);
            Channel wrappedChannel = this.wrapResponseLogger(channel);
            Metadata headerMetadata = this.createRequestHeadersMetadata(headers);
            CourierGrpc.CourierStub rawStub = CourierGrpc.newStub(wrappedChannel);
            CourierGrpc.CourierStub futureStub = (CourierGrpc.CourierStub)MetadataUtils.attachHeaders((AbstractStub)rawStub, (Metadata)headerMetadata);
            this.logFunction.apply("will try to send package from " + from + " ...");
            final AtomicReference<Status> status = new AtomicReference<Status>();
            AtomicReference response = new AtomicReference();
            final CompletableFuture responsePromise = new CompletableFuture();
            try {
                StreamObserver<CourierRequest> collector = futureStub.collectPackages(new StreamObserver<CourierSummary>(){
                    final List<CourierSummary> resultList = new ArrayList<CourierSummary>();

                    public void onNext(CourierSummary response) {
                        GrpcClient.this.logFunction.apply("received response CourierSummary{count=" + response.getNumMessages() + ", length=" + response.getTotalLength() + "}");
                        this.resultList.add(response);
                        if (lockStepMode) {
                            GrpcClient.this.logFunction.apply("releasing semaphore after receiving response");
                            lockStep.release();
                        }
                    }

                    public void onError(Throwable th) {
                        GrpcClient.this.logFunction.apply("error in collecting summaries " + th);
                        errorSignal.compareAndSet(false, true);
                        if (lockStepMode) {
                            GrpcClient.this.logFunction.apply("releasing semaphore after receiving error");
                            lockStep.release();
                        }
                        if (th instanceof StatusRuntimeException) {
                            StatusRuntimeException exception = (StatusRuntimeException)th;
                            status.set(exception.getStatus());
                        } else {
                            status.set(Status.UNKNOWN.withDescription(th.getMessage()));
                        }
                        this.resolveResponsePromise();
                    }

                    public void onCompleted() {
                        GrpcClient.this.logFunction.apply("completed collecting summaries");
                        status.set(Status.OK);
                        this.resolveResponsePromise();
                    }

                    private void resolveResponsePromise() {
                        GrpcClient.this.logFunction.apply("client result has " + this.resultList.size() + " entries");
                        responsePromise.complete(this.resultList);
                    }
                });
                for (int i = 0; i < messages.size(); ++i) {
                    if (i >= cancelThreshold) {
                        this.logFunction.apply("cancelling sending messages");
                        awaitChannelTermination.set(false);
                        throw new CancellationException("Cancel threshold reached: " + cancelThreshold);
                    }
                    if (errorSignal.get()) {
                        this.logFunction.apply("aborting sending messages as error was discovered");
                        break;
                    }
                    String requestId = ids.get(i);
                    if (lockStepMode) {
                        this.logFunction.apply("acquiring semaphore before sending request " + requestId);
                        lockStep.acquire();
                    }
                    CourierRequest request = CourierRequest.newBuilder().setId(requestId).setFrom(from).setMessage(messages.get(i)).setVariant(GrpcClient.retrieveVariant(requestId)).build();
                    this.logFunction.apply("sending message CourierRequest{id=" + request.getId() + ", from=" + request.getFrom() + ", message.length=" + request.getMessage().length() + "}");
                    collector.onNext((Object)request);
                    Thread.sleep(interMessageSleepMs);
                }
                this.logFunction.apply("completed sending packages");
                collector.onCompleted();
                response.set(responsePromise.get());
            }
            catch (StatusRuntimeException ex) {
                this.logFunction.apply("RPC failed, status: " + ex.getStatus());
                status.set(ex.getStatus());
            }
            catch (Exception ex) {
                this.logFunction.apply("RPC failed, message: " + ex.getMessage());
                status.set(Status.UNKNOWN.withDescription(ex.getMessage()));
            }
            RpcResult<List<CourierSummary>> rpcResult = new RpcResult<List<CourierSummary>>(response.get(), (Status)status.get());
            return rpcResult;
        }
        finally {
            if (awaitChannelTermination.get()) {
                this.shutdownChannel(channel);
            } else {
                channel.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RpcResult<CourierSummary> aggregatePackages(Map<String, Object> headers, List<String> ids, String from, List<String> messages, int interMessageSleepMs, int cancelThreshold) {
        ManagedChannel channel = this.initializeChannel();
        AtomicBoolean awaitChannelTermination = new AtomicBoolean(true);
        try {
            final AtomicBoolean errorSignal = new AtomicBoolean(false);
            Channel wrappedChannel = this.wrapResponseLogger(channel);
            Metadata headerMetadata = this.createRequestHeadersMetadata(headers);
            CourierGrpc.CourierStub rawStub = CourierGrpc.newStub(wrappedChannel);
            CourierGrpc.CourierStub futureStub = (CourierGrpc.CourierStub)MetadataUtils.attachHeaders((AbstractStub)rawStub, (Metadata)headerMetadata);
            this.logFunction.apply("will try to agggreate package from " + from + " ...");
            final AtomicReference<Status> status = new AtomicReference<Status>();
            final AtomicReference response = new AtomicReference();
            final CompletableFuture responsePromise = new CompletableFuture();
            try {
                StreamObserver<CourierRequest> collector = futureStub.aggregatePackages(new StreamObserver<CourierSummary>(){

                    public void onNext(CourierSummary summary) {
                        GrpcClient.this.logFunction.apply("received response CourierSummary{count=" + summary.getNumMessages() + ", length=" + summary.getTotalLength() + "}");
                        response.set(summary);
                    }

                    public void onError(Throwable th) {
                        GrpcClient.this.logFunction.apply("error in aggregating summaries " + th);
                        errorSignal.compareAndSet(false, true);
                        if (th instanceof StatusRuntimeException) {
                            StatusRuntimeException exception = (StatusRuntimeException)th;
                            status.set(exception.getStatus());
                        } else {
                            status.set(Status.UNKNOWN.withDescription(th.getMessage()));
                        }
                        this.resolveResponsePromise();
                    }

                    public void onCompleted() {
                        GrpcClient.this.logFunction.apply("completed aggregating summaries");
                        status.set(Status.OK);
                        this.resolveResponsePromise();
                    }

                    private void resolveResponsePromise() {
                        CourierSummary courierSummary = (CourierSummary)response.get();
                        GrpcClient.this.logFunction.apply("client result: " + courierSummary);
                        responsePromise.complete(courierSummary);
                    }
                });
                for (int i = 0; i < messages.size(); ++i) {
                    if (i >= cancelThreshold) {
                        this.logFunction.apply("cancelling sending messages");
                        awaitChannelTermination.set(false);
                        throw new CancellationException("Cancel threshold reached: " + cancelThreshold);
                    }
                    if (errorSignal.get()) {
                        this.logFunction.apply("aborting sending messages as error was discovered");
                        break;
                    }
                    String requestId = ids.get(i);
                    CourierRequest request = CourierRequest.newBuilder().setId(requestId).setFrom(from).setMessage(messages.get(i)).setVariant(GrpcClient.retrieveVariant(requestId)).build();
                    this.logFunction.apply("sending message CourierRequest{id=" + request.getId() + ", from=" + request.getFrom() + ", message.length=" + request.getMessage().length() + "}");
                    collector.onNext((Object)request);
                    Thread.sleep(interMessageSleepMs);
                }
                this.logFunction.apply("completed sending packages");
                collector.onCompleted();
                responsePromise.get();
            }
            catch (StatusRuntimeException ex) {
                this.logFunction.apply("RPC failed, status: " + ex.getStatus());
                status.set(ex.getStatus());
            }
            catch (Exception ex) {
                this.logFunction.apply("RPC failed, message: " + ex.getMessage());
                status.set(Status.UNKNOWN.withDescription(ex.getMessage()));
            }
            RpcResult<CourierSummary> rpcResult = new RpcResult<CourierSummary>(response.get(), (Status)status.get());
            return rpcResult;
        }
        finally {
            if (awaitChannelTermination.get()) {
                this.shutdownChannel(channel);
            } else {
                channel.shutdownNow();
            }
        }
    }

    public static void main(String ... args) throws Exception {
        String host = "localhost";
        int port = 8080;
        GrpcClient client = new GrpcClient("localhost", 8080);
    }

    private static void runSendPackageSuccess(GrpcClient client) {
        String id = UUID.randomUUID().toString();
        String user = "Jim";
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 100000; ++i) {
            sb.append("a");
            if (i % 1000 != 0) continue;
            sb.append(".");
        }
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-send-package." + System.currentTimeMillis());
        RpcResult<CourierReply> rpcResult = client.sendPackage(headers, id, "Jim", sb.toString());
        CourierReply courierReply = rpcResult.result();
        client.logFunction.apply("sendPackage response = " + courierReply);
        Status status = rpcResult.status();
        client.logFunction.apply("sendPackage status = " + status);
    }

    private static void runSendPackageSendError(GrpcClient client) {
        String id = UUID.randomUUID().toString() + ".SEND_ERROR";
        String user = "Jim";
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 100000; ++i) {
            sb.append("a");
            if (i % 1000 != 0) continue;
            sb.append(".");
        }
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-send-package." + System.currentTimeMillis());
        RpcResult<CourierReply> rpcResult = client.sendPackage(headers, id, "Jim", sb.toString());
        CourierReply courierReply = rpcResult.result();
        client.logFunction.apply("sendPackage response = " + courierReply);
        Status status = rpcResult.status();
        client.logFunction.apply("sendPackage status = " + status);
    }

    private static void runCollectPackagesSuccess(GrpcClient client) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-collect-packages-success." + System.currentTimeMillis());
        List<String> ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
        List<String> messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        RpcResult<List<CourierSummary>> rpcResult = client.collectPackages(headers, ids, "User", messages, 100, true, messages.size() + 1);
        List<CourierSummary> courierSummaries = rpcResult.result();
        client.logFunction.apply("collectPackages[success] summary = " + courierSummaries);
        Status status = rpcResult.status();
        client.logFunction.apply("collectPackages[success] status = " + status);
    }

    private static void runCollectPackagesSendError(GrpcClient client) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-collect-packages-server-error." + System.currentTimeMillis());
        List<String> ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
        ids.set(5, (String)ids.get(5) + ".SEND_ERROR");
        List<String> messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        RpcResult<List<CourierSummary>> rpcResult = client.collectPackages(headers, ids, "User", messages, 100, true, messages.size() + 1);
        List<CourierSummary> courierSummaries = rpcResult.result();
        client.logFunction.apply("collectPackages[cancel] summary = " + courierSummaries);
        Status status = rpcResult.status();
        client.logFunction.apply("collectPackages[cancel] status = " + status);
    }

    private static void runCollectPackagesExitPreResponse(GrpcClient client) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-collect-packages-server-pre-cancel." + System.currentTimeMillis());
        List<String> ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
        ids.set(5, (String)ids.get(5) + ".EXIT_PRE_RESPONSE");
        List<String> messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        RpcResult<List<CourierSummary>> rpcResult = client.collectPackages(headers, ids, "User", messages, 100, true, messages.size() + 1);
        List<CourierSummary> courierSummaries = rpcResult.result();
        client.logFunction.apply("collectPackages[cancel] summary = " + courierSummaries);
        Status status = rpcResult.status();
        client.logFunction.apply("collectPackages[cancel] status = " + status);
    }

    private static void runCollectPackagesExitPostResponse(GrpcClient client) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-collect-packages-server-post-cancel." + System.currentTimeMillis());
        List<String> ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
        ids.set(5, (String)ids.get(5) + ".EXIT_POST_RESPONSE");
        List<String> messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        RpcResult<List<CourierSummary>> rpcResult = client.collectPackages(headers, ids, "User", messages, 100, true, messages.size() + 1);
        List<CourierSummary> courierSummaries = rpcResult.result();
        client.logFunction.apply("collectPackages[cancel] summary = " + courierSummaries);
        Status status = rpcResult.status();
        client.logFunction.apply("collectPackages[cancel] status = " + status);
    }

    private static void runAggregatePackagesSuccess(GrpcClient client) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-aggregate-packages-success." + System.currentTimeMillis());
        List<String> ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
        List<String> messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        RpcResult<CourierSummary> rpcResult = client.aggregatePackages(headers, ids, "User", messages, 100, messages.size() + 1);
        CourierSummary courierSummary = rpcResult.result();
        client.logFunction.apply("aggregatePackages[success] summary = " + courierSummary);
        Status status = rpcResult.status();
        client.logFunction.apply("aggregatePackages[success] status = " + status);
    }

    private static void runAggregatePackagesSendError(GrpcClient client) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-aggregate-packages-server-error." + System.currentTimeMillis());
        List<String> ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
        ids.set(5, (String)ids.get(5) + ".SEND_ERROR");
        List<String> messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        RpcResult<CourierSummary> rpcResult = client.aggregatePackages(headers, ids, "User", messages, 100, messages.size() + 1);
        CourierSummary courierSummary = rpcResult.result();
        client.logFunction.apply("aggregatePackages[cancel] summary = " + courierSummary);
        Status status = rpcResult.status();
        client.logFunction.apply("aggregatePackages[cancel] status = " + status);
    }

    private static void runAggregatePackagesExitPreResponse(GrpcClient client) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-cid", "cid-aggregate-packages-server-pre-cancel." + System.currentTimeMillis());
        List<String> ids = IntStream.range(0, 10).mapToObj(i -> "id-" + i).collect(Collectors.toList());
        ids.set(5, (String)ids.get(5) + ".EXIT_PRE_RESPONSE");
        List<String> messages = IntStream.range(0, 10).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        RpcResult<CourierSummary> rpcResult = client.aggregatePackages(headers, ids, "User", messages, 100, messages.size() + 1);
        CourierSummary courierSummary = rpcResult.result();
        client.logFunction.apply("aggregatePackages[cancel] summary = " + courierSummary);
        Status status = rpcResult.status();
        client.logFunction.apply("aggregatePackages[cancel] status = " + status);
    }

    public static final class RpcResult<Result> {
        private final Result result;
        private final Status status;

        private RpcResult(Result result, Status status) {
            this.result = result;
            this.status = status;
        }

        public Result result() {
            return this.result;
        }

        public Status status() {
            return this.status;
        }
    }
}

