/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.taskDispatcher;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.flipkart.flux.api.core.TaskExecutionMessage;
import com.flipkart.flux.metrics.iface.MetricsClient;
import com.flipkart.flux.taskDispatcher.ExecutionNodeTaskDispatcher;
import com.google.inject.Inject;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import javax.inject.Named;
import javax.inject.Singleton;
import javax.ws.rs.core.Response;
import org.apache.http.HttpEntity;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Singleton
public class ExecutionNodeTaskDispatcherImpl
implements ExecutionNodeTaskDispatcher {
    private static Logger logger = LogManager.getLogger(ExecutionNodeTaskDispatcherImpl.class);
    private final CloseableHttpClient closeableHttpClient;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private MetricsClient metricsClient;

    @Inject
    public ExecutionNodeTaskDispatcherImpl(@Named(value="connector.max.connections") Integer maxConnections, @Named(value="connector.max.connections.per.route") Integer maxConnectionsPerRoute, @Named(value="connector.connection.timeout") Integer connectionTimeout, @Named(value="connector.socket.timeout") Integer socketTimeOut, MetricsClient metricsClient) {
        RequestConfig clientConfig = RequestConfig.custom().setConnectTimeout(connectionTimeout.intValue()).setSocketTimeout(socketTimeOut.intValue()).setConnectionRequestTimeout(socketTimeOut.intValue()).build();
        PoolingHttpClientConnectionManager syncConnectionManager = new PoolingHttpClientConnectionManager();
        syncConnectionManager.setMaxTotal(maxConnections.intValue());
        syncConnectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute.intValue());
        this.closeableHttpClient = HttpClientBuilder.create().setDefaultRequestConfig(clientConfig).setConnectionManager((HttpClientConnectionManager)syncConnectionManager).build();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> HttpClientUtils.closeQuietly((HttpClient)this.closeableHttpClient)));
        this.metricsClient = metricsClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int forwardExecutionMessage(String endPoint, TaskExecutionMessage taskExecutionMessage) {
        int defaultStatusCode = -1;
        CloseableHttpResponse httpResponse = null;
        HttpPost httpPostRequest = new HttpPost(endPoint);
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.objectMapper.writeValue((OutputStream)byteArrayOutputStream, (Object)taskExecutionMessage);
            httpPostRequest.setEntity((HttpEntity)new ByteArrayEntity(byteArrayOutputStream.toByteArray(), ContentType.APPLICATION_JSON));
            httpResponse = this.closeableHttpClient.execute((HttpUriRequest)httpPostRequest);
            defaultStatusCode = httpResponse.getStatusLine().getStatusCode();
            if (defaultStatusCode == Response.Status.ACCEPTED.getStatusCode()) {
                logger.info("Posting over http is successful. StatusCode: {} smId:{} taskId:{}", (Object)defaultStatusCode, (Object)taskExecutionMessage.getAkkaMessage().getStateMachineId(), (Object)taskExecutionMessage.getAkkaMessage().getTaskId());
            } else {
                logger.error("Did not receive a valid response from Flux core. StatusCode: {}, smId:{} taskId:{} message: {}", (Object)defaultStatusCode, (Object)taskExecutionMessage.getAkkaMessage().getStateMachineId(), (Object)taskExecutionMessage.getAkkaMessage().getTaskId(), (Object)EntityUtils.toString((HttpEntity)httpResponse.getEntity()));
            }
        }
        catch (IOException e) {
            logger.error("Posting over http errored. smId: {}, taskId:{} Message:{}  Exception: {}", (Object)taskExecutionMessage.getAkkaMessage().getStateMachineId(), (Object)taskExecutionMessage.getAkkaMessage().getTaskId(), (Object)e.getMessage(), (Object)e);
        }
        finally {
            if (defaultStatusCode >= Response.Status.OK.getStatusCode() && defaultStatusCode < Response.Status.MOVED_PERMANENTLY.getStatusCode()) {
                this.metricsClient.markMeter("stateMachine.tasks.forwardToExecutor.2xx");
            } else if (defaultStatusCode >= Response.Status.BAD_REQUEST.getStatusCode() && defaultStatusCode < Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
                this.metricsClient.markMeter("stateMachine.tasks.forwardToExecutor.4xx");
            } else if (defaultStatusCode >= Response.Status.INTERNAL_SERVER_ERROR.getStatusCode() && defaultStatusCode < Response.Status.HTTP_VERSION_NOT_SUPPORTED.getStatusCode()) {
                this.metricsClient.markMeter("stateMachine.tasks.forwardToExecutor.5xx");
            }
        }
        HttpClientUtils.closeQuietly((CloseableHttpResponse)httpResponse);
        return defaultStatusCode;
    }
}

