/*
 * Decompiled with CFR 0.152.
 */
package org.commoncrawl.query;

import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.commoncrawl.async.EventLoop;
import org.commoncrawl.async.Timer;
import org.commoncrawl.hadoop.mergeutils.MergeSortSpillWriter;
import org.commoncrawl.hadoop.mergeutils.RawKeyValueComparator;
import org.commoncrawl.hadoop.mergeutils.SequenceFileMerger;
import org.commoncrawl.hadoop.mergeutils.SequenceFileSpillWriter;
import org.commoncrawl.hadoop.mergeutils.SpillWriter;
import org.commoncrawl.query.ClientQueryInfo;
import org.commoncrawl.query.QueryCommon;
import org.commoncrawl.query.QueryCompletionCallback;
import org.commoncrawl.query.QueryController;
import org.commoncrawl.query.QueryInputSplit;
import org.commoncrawl.query.QueryProgressCallback;
import org.commoncrawl.query.QueryRequest;
import org.commoncrawl.query.QueryResult;
import org.commoncrawl.query.QueryResultFileIndex;
import org.commoncrawl.query.QuerySlaveConnection;
import org.commoncrawl.query.QuerySlaveServer;
import org.commoncrawl.query.QueryStatus;
import org.commoncrawl.query.RemoteQueryCompletionCallback;
import org.commoncrawl.query.RemoteQueryInfo;
import org.commoncrawl.rpc.BinaryProtocol;
import org.commoncrawl.rpc.MessageData;
import org.commoncrawl.rpc.OutgoingMessageContext;
import org.commoncrawl.rpc.RPCStruct;
import org.commoncrawl.util.shared.CCStringUtils;
import org.commoncrawl.util.shared.FPGenerator;
import org.commoncrawl.util.shared.FileUtils;
import org.commoncrawl.util.shared.FlexBuffer;
import org.commoncrawl.util.shared.Tuples;

public abstract class Query<DataType extends RPCStruct, ResultKeyType extends WritableComparable, ResultValueType extends Writable> {
    private static final Log LOG = LogFactory.getLog(Query.class);
    protected static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
    protected RemoteQueryInfo _remoteQueryInfo;
    protected ClientQueryInfo _clientQueryInfo;
    protected QueryCommon _commonInfo;
    protected DataType _queryData;
    private Object _contextObj;
    private QueryController _queryController;
    protected QueryStatus _queryStatus;
    protected Timer _queryMonitorTimer;
    protected QuerySlaveServer _slave;
    protected Thread _queryThread;
    protected boolean _cancelQuery;
    protected HashMap<Integer, ShardState> _shardStatusMap = null;
    protected RemoteQueryCompletionCallback _completionCallback;
    protected ArrayList<QueryInputSplit> _inputSplits = new ArrayList();
    protected QueryRequest _associatedRequest;
    private static final int QUERY_RETRY_DELAY = 5000;
    private static final int MAX_RETRIES = 5;
    Tuples.Pair<Long, Long> _cachedCanonicalId = null;
    public static final String MERGED_RESULTS_SUFFIX = "MERGED_RESULTS";
    private long _aggregationCompletionCount = 0L;

    public Query(DataType queryData) {
        this._queryData = queryData;
        this._commonInfo = new QueryCommon();
        this._queryStatus = new QueryStatus();
        this._queryStatus.setStatus(0);
    }

    protected void setQueryData(DataType queryData) {
        this._queryData = queryData;
    }

    protected DataType getQueryData() {
        return this._queryData;
    }

    public void initializeRemoteQuery(QuerySlaveServer slave, RemoteQueryInfo queryInfo, DataType queryData) {
        try {
            this._slave = slave;
            this._remoteQueryInfo = queryInfo;
            this._clientQueryInfo = queryInfo.getClientQueryData();
            this._commonInfo = (QueryCommon)queryInfo.getCommonInfo().clone();
            this._queryData = queryData;
            this._queryStatus = new QueryStatus();
            this._queryStatus.setQueryId(this._commonInfo.getQueryId());
            this._queryStatus.setShardId(this._remoteQueryInfo.getShardId());
        }
        catch (CloneNotSupportedException e) {
            // empty catch block
        }
    }

    public RemoteQueryInfo getRemoteQueryInfo() {
        return this._remoteQueryInfo;
    }

    public QueryStatus getQueryStatus() {
        return this._queryStatus;
    }

    public QueryCommon getCommonQueryInfo() {
        return this._commonInfo;
    }

    public ClientQueryInfo getClientQueryInfo() {
        return this._clientQueryInfo;
    }

    public void setClientQueryInfo(ClientQueryInfo info) {
        this._clientQueryInfo = info;
    }

    public ArrayList<QueryInputSplit> getSplits() {
        return this._inputSplits;
    }

    public void setSplits(ArrayList<QueryInputSplit> splits) {
        this._inputSplits = splits;
    }

    public Object getContextObject() {
        return this._contextObj;
    }

    public void setContext(Object context) {
        this._contextObj = context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runShardedQuery(final String fqHostName, final FileSystem remoteFileSystem, final Configuration conf, final EventLoop eventLoop, final QuerySlaveServer server, final File queryTempDir, final QueryProgressCallback<DataType, ResultKeyType, ResultValueType> progressCallback, final RemoteQueryCompletionCallback completionCallback) throws IOException {
        Query query = this;
        synchronized (query) {
            this._queryStatus.setStatus(1);
        }
        FileUtils.recursivelyDeleteFile(queryTempDir);
        queryTempDir.mkdirs();
        final Path hdfsAttemptPath = this.getHDFSQueryAttemptFilePathForShard(this.getRemoteQueryInfo().getShardId());
        remoteFileSystem.mkdirs(hdfsAttemptPath);
        final Path mergeTemp = new Path(hdfsAttemptPath, "mergeTemp");
        remoteFileSystem.mkdirs(mergeTemp);
        final Path shardDataFile = new Path(hdfsAttemptPath, Query.getPartNameForSlave(this.getRemoteQueryInfo().getShardId()));
        this._queryThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    CompressionCodec codec = null;
                    Class codecClass = conf.getClass("mapred.output.compression.codec", null, CompressionCodec.class);
                    if (codecClass != null) {
                        codec = (CompressionCodec)ReflectionUtils.newInstance((Class)codecClass, (Configuration)conf);
                    }
                    LOG.info((Object)(fqHostName + " Creating SpillWriter with output file:" + shardDataFile + " for Query:" + Query.this.getQueryId() + " Codec is:" + codecClass));
                    SequenceFileSpillWriter finalSpillWriter = new SequenceFileSpillWriter(remoteFileSystem, conf, shardDataFile, Query.this.getKeyClass(), Query.this.getValueClass(), null, codec, 2);
                    LOG.info((Object)(fqHostName + " Creating MergingSpillWriter at:" + hdfsAttemptPath + " for Query:" + Query.this.getQueryId()));
                    try {
                        MergeSortSpillWriter mergingSpillWriter = new MergeSortSpillWriter(conf, finalSpillWriter, remoteFileSystem, mergeTemp, null, Query.this.allocateRawComparator(), Query.this.getKeyClass(), Query.this.getValueClass(), null, null);
                        try {
                            LOG.info((Object)(fqHostName + " Slave Query Thread:Executing for Query:" + Query.this.getQueryId()));
                            long resultCount = Query.this.executeShardedQuery(remoteFileSystem, conf, eventLoop, server, queryTempDir, hdfsAttemptPath, mergingSpillWriter, new QueryProgressCallback<DataType, ResultKeyType, ResultValueType>(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                @Override
                                public boolean updateProgress(Query<DataType, ResultKeyType, ResultValueType> theQueryObject, float percentComplete) {
                                    Query query = Query.this;
                                    synchronized (query) {
                                        if (Query.this._cancelQuery) {
                                            Query.this._queryStatus.setStatus(4);
                                            return false;
                                        }
                                        Query.this._queryStatus.setProgress(percentComplete);
                                    }
                                    return progressCallback.updateProgress(theQueryObject, percentComplete);
                                }
                            });
                            if (mergingSpillWriter != null) {
                                mergingSpillWriter.close();
                                mergingSpillWriter = null;
                            }
                            if (finalSpillWriter != null) {
                                finalSpillWriter.close();
                                finalSpillWriter = null;
                            }
                            remoteFileSystem.delete(mergeTemp, true);
                            Query query = Query.this;
                            synchronized (query) {
                                if (Query.this._queryStatus.getStatus() != 4) {
                                    FileStatus[] resultingFiles;
                                    Query.this._queryStatus.setStatus(2);
                                    Query.this._queryStatus.setOptResultCount(resultCount);
                                    Path finalResultsPath = Query.this.getHDFSQueryResultsPath();
                                    for (FileStatus resultsFile : resultingFiles = remoteFileSystem.globStatus(new Path(hdfsAttemptPath, "*"))) {
                                        boolean success = remoteFileSystem.rename(resultsFile.getPath(), new Path(finalResultsPath, resultsFile.getPath().getName()));
                                        if (!success) {
                                            LOG.error((Object)(fqHostName + " Failed to Copy Result File:" + resultsFile.getPath() + " to destination:" + finalResultsPath + " for Query:" + Query.this.getQueryId() + " Shard:" + Query.this.getRemoteQueryInfo().getShardId()));
                                            continue;
                                        }
                                        LOG.info((Object)(fqHostName + " Copied Results File:" + resultsFile.getPath() + " to destination:" + finalResultsPath + " for Query:" + Query.this.getQueryId() + " Shard:" + Query.this.getRemoteQueryInfo().getShardId()));
                                    }
                                }
                            }
                            if (eventLoop != null) {
                                final long finalResultCount = resultCount;
                                eventLoop.setTimer(new Timer(0L, false, new Timer.Callback(){

                                    @Override
                                    public void timerFired(Timer timer) {
                                        LOG.info((Object)(fqHostName + " Query:" + Query.this.getQueryId() + " Completed with:" + finalResultCount + " results."));
                                        completionCallback.queryComplete(Query.this, finalResultCount);
                                    }
                                }));
                            } else {
                                LOG.info((Object)(fqHostName + " Query:" + Query.this.getQueryId() + " Completed with:" + resultCount + " results."));
                                completionCallback.queryComplete(Query.this, resultCount);
                            }
                        }
                        finally {
                            if (mergingSpillWriter != null) {
                                try {
                                    mergingSpillWriter.close();
                                }
                                catch (IOException e) {
                                    LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
                                }
                            }
                        }
                    }
                    finally {
                        if (finalSpillWriter != null) {
                            finalSpillWriter.close();
                        }
                    }
                }
                catch (Exception e) {
                    Query query = Query.this;
                    synchronized (query) {
                        Query.this._queryStatus.setStatus(3);
                        Query.this._queryStatus.setOptErrorReason(CCStringUtils.stringifyException((Throwable)e));
                    }
                    if (eventLoop != null) {
                        eventLoop.setTimer(new Timer(0L, false, new Timer.Callback(){

                            @Override
                            public void timerFired(Timer timer) {
                                LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
                                completionCallback.queryFailed(Query.this, "Query:" + Query.this.getQueryId() + " Failed on Host:" + fqHostName + " with Exception:" + CCStringUtils.stringifyException((Throwable)e));
                            }
                        }));
                    } else {
                        LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
                        completionCallback.queryFailed(Query.this, "Query:" + Query.this.getQueryId() + " Failed on Host:" + fqHostName + " with Exception:" + CCStringUtils.stringifyException((Throwable)e));
                    }
                }
                finally {
                    FileUtils.recursivelyDeleteFile(queryTempDir);
                    try {
                        remoteFileSystem.delete(hdfsAttemptPath, true);
                    }
                    catch (IOException e) {
                        LOG.error((Object)(fqHostName + " Failed to delete attempt directory:" + hdfsAttemptPath + " for Query:" + Query.this.getQueryId() + " Shard:" + Query.this.getRemoteQueryInfo().getShardId()));
                    }
                }
            }
        });
        LOG.info((Object)(fqHostName + " Starting Slave Query Thread for Query:" + this.getQueryId()));
        this._queryThread.start();
    }

    public void startLocalQuery(final FileSystem fileSystem, final Configuration conf, final File tempFileDir, final EventLoop eventLoop, final QueryRequest<DataType, ResultKeyType, ResultValueType> queryRequest, final RemoteQueryCompletionCallback completionCallback) {
        queryRequest.getQueryStatus().setStatus(1);
        queryRequest.getQueryStatus().setProgress(0.66f);
        this._queryThread = new Thread(new Runnable(){

            @Override
            public void run() {
                LOG.info((Object)("Client Query Thread.Executing for Query:" + Query.this.getQueryId()));
                try {
                    FileUtils.recursivelyDeleteFile(tempFileDir);
                    LOG.info((Object)("Client Query Thread for:" + Query.this.getQueryId() + " creating temp file directory:" + tempFileDir.getAbsolutePath()));
                    tempFileDir.mkdirs();
                    LOG.info((Object)("Executing Local Query for Query:" + Query.this.getQueryId()));
                    final long resultCount = Query.this.executeAggregationQuery(fileSystem, conf, eventLoop, tempFileDir, queryRequest, new QueryProgressCallback<RPCStruct, WritableComparable, Writable>(){

                        @Override
                        public boolean updateProgress(Query<RPCStruct, WritableComparable, Writable> theQueryObject, float percentComplete) {
                            queryRequest.getQueryStatus().setProgress(0.66f + 0.33f * percentComplete);
                            return true;
                        }
                    });
                    queryRequest.getQueryStatus().setProgress(0.99f);
                    eventLoop.setTimer(new Timer(0L, false, new Timer.Callback(){

                        @Override
                        public void timerFired(Timer timer) {
                            LOG.info((Object)("Local QueryComplete for Query:" + Query.this.getQueryId()));
                            completionCallback.queryComplete(queryRequest.getSourceQuery(), resultCount);
                        }
                    }));
                }
                catch (IOException e) {
                    LOG.error((Object)("Query: " + Query.this.getQueryId() + " Failed on executeLocal with Error:" + CCStringUtils.stringifyException((Throwable)e)));
                    final String error = CCStringUtils.stringifyException((Throwable)e);
                    eventLoop.setTimer(new Timer(0L, false, new Timer.Callback(){

                        @Override
                        public void timerFired(Timer timer) {
                            completionCallback.queryFailed(queryRequest.getSourceQuery(), error);
                        }
                    }));
                }
            }
        });
        LOG.info((Object)("Starting Local Query Thread for Query:" + this.getQueryId()));
        this._queryThread.start();
    }

    public void startCacheQuery(final QueryController controller, final FileSystem fileSystem, final Configuration conf, final EventLoop eventLoop, final QueryRequest<DataType, ResultKeyType, ResultValueType> queryRequest, final QueryCompletionCallback<DataType, ResultKeyType, ResultValueType> completionCallback) {
        queryRequest.getQueryStatus().setStatus(1);
        queryRequest.getQueryStatus().setProgress(0.99f);
        this._queryThread = new Thread(new Runnable(){

            @Override
            public void run() {
                LOG.info((Object)("Executing Cache Query for Query:" + Query.this.getQueryId()));
                try {
                    Query.this.executeCacheQuery(fileSystem, conf, eventLoop, controller, queryRequest, new QueryCompletionCallback<DataType, ResultKeyType, ResultValueType>(){

                        @Override
                        public void queryComplete(QueryRequest<DataType, ResultKeyType, ResultValueType> request, final QueryResult<ResultKeyType, ResultValueType> queryResult) {
                            eventLoop.setTimer(new Timer(0L, false, new Timer.Callback(){

                                @Override
                                public void timerFired(Timer timer) {
                                    queryRequest.getQueryStatus().setProgress(1.0f);
                                    LOG.info((Object)("Calling queryComplete on cacheRequest for Query:" + Query.this.getQueryId()));
                                    completionCallback.queryComplete(queryRequest, queryResult);
                                }
                            }));
                        }

                        @Override
                        public void queryFailed(QueryRequest<DataType, ResultKeyType, ResultValueType> request, final String reason) {
                            eventLoop.setTimer(new Timer(0L, false, new Timer.Callback(){

                                @Override
                                public void timerFired(Timer timer) {
                                    LOG.info((Object)("Calling queryFailed on cacheRequest for Query:" + Query.this.getQueryId() + " Reason:" + reason));
                                    completionCallback.queryFailed(queryRequest, reason);
                                }
                            }));
                        }
                    });
                }
                catch (IOException e) {
                    LOG.error((Object)("Query: " + Query.this.getQueryId() + " Failed on cacheQuery with Error:" + CCStringUtils.stringifyException((Throwable)e)));
                    final String error = CCStringUtils.stringifyException((Throwable)e);
                    eventLoop.setTimer(new Timer(0L, false, new Timer.Callback(){

                        @Override
                        public void timerFired(Timer timer) {
                            completionCallback.queryFailed(queryRequest, error);
                        }
                    }));
                }
            }
        });
        LOG.info((Object)("Starting Cache Query Thread for Query:" + this.getQueryId()));
        this._queryThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startShardedQuery(QueryController controller, final QueryRequest request, ArrayList<QueryInputSplit> inputSplits, QueryProgressCallback<DataType, ResultKeyType, ResultValueType> progressCallback, RemoteQueryCompletionCallback completionCallback) throws IOException {
        LOG.info((Object)("Starting Remote(Master)Query for Query:" + this.getQueryId()));
        this._queryController = controller;
        this._associatedRequest = request;
        this._completionCallback = completionCallback;
        Query query = this;
        synchronized (query) {
            this._queryStatus.setStatus(1);
        }
        this._shardStatusMap = new HashMap();
        for (QueryInputSplit split : inputSplits) {
            LOG.info((Object)("Dispatching Query:" + this.getQueryId() + " to Shard:" + split.getShardId()));
            RemoteQueryInfo queryDetails = new RemoteQueryInfo();
            queryDetails.setCommonInfo(this.getCommonQueryInfo());
            queryDetails.setClientQueryData(this.getClientQueryInfo());
            queryDetails.setQueryClassType(this.getClass().getName());
            queryDetails.setQueryDataClassType(this._queryData.getClass().getName());
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            ((RPCStruct)this._queryData).serialize((DataOutput)outputBuffer, new BinaryProtocol());
            queryDetails.setQueryDataBuffer(new FlexBuffer(outputBuffer.getData(), 0, outputBuffer.getLength(), true));
            queryDetails.setShardId(split.getShardId());
            QueryStatus queryStatus = new QueryStatus();
            queryStatus.setQueryId(this.getQueryId());
            queryStatus.setShardId(split.getShardId());
            queryStatus.setAttempts(0);
            queryStatus.setStatus(0);
            ShardState statusInfo = new ShardState(queryStatus, queryDetails);
            this._shardStatusMap.put(queryStatus.getShardId(), statusInfo);
        }
        this._queryMonitorTimer = new Timer(500L, true, new Timer.Callback(){

            @Override
            public void timerFired(Timer timer) {
                try {
                    Query.this.updateRemoteQueryStatus(request);
                }
                catch (IOException e) {
                    LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
                }
            }
        });
        this._queryController.getHost().getEventLoop().setTimer(this._queryMonitorTimer);
        LOG.info((Object)("Dispatching Slave Queries for Query:" + this.getQueryId()));
        this.updateRemoteQueryStatus(request);
    }

    public void updateQueryStatus(QueryStatus statusUpdate) throws IOException {
        ShardState slaveInfo = this._shardStatusMap.get(statusUpdate.getShardId());
        if (slaveInfo != null) {
            if (statusUpdate.getStatus() == 3 && slaveInfo._queryStatus.getStatus() != 3) {
                LOG.info((Object)("Slave:" + slaveInfo.getAssignedNode().getFullyQualifiedName() + " Reported Error:" + statusUpdate.getOptErrorReason() + " for Query:" + this.getQueryId()));
            } else if (statusUpdate.getStatus() == 2 && slaveInfo._queryStatus.getStatus() != 2) {
                LOG.info((Object)("Slave:" + slaveInfo.getAssignedNode().getFullyQualifiedName() + " Reported FINISHED for Query:" + this.getQueryId() + " ResultCount:" + statusUpdate.getOptResultCount()));
            }
            try {
                statusUpdate.setFieldClean(3);
                statusUpdate.setFieldClean(4);
                statusUpdate.setFieldClean(1);
                statusUpdate.setFieldClean(2);
                slaveInfo._queryStatus.merge(statusUpdate);
            }
            catch (CloneNotSupportedException cloneNotSupportedException) {
                // empty catch block
            }
            this.updateRemoteQueryStatus(this._associatedRequest);
        } else {
            LOG.error((Object)("Query: " + this.getQueryId() + " Received Status Update from Unknown Shard:" + statusUpdate.getShardId()));
        }
    }

    private void updateRemoteQueryStatus(QueryRequest request) throws IOException {
        int completedCount = 0;
        int failedCount = 0;
        long totalResultCount = 0L;
        String failureReason = "";
        for (ShardState shardStatus : this._shardStatusMap.values()) {
            if (shardStatus._queryStatus.getStatus() == 0) {
                if (shardStatus._queryStatus.getLastAttemptTime() != 0L && System.currentTimeMillis() - shardStatus._queryStatus.getLastAttemptTime() < 5000L) continue;
                shardStatus.getQueryStatus().setAttempts(shardStatus.getQueryStatus().getAttempts() + 1);
                if (shardStatus.getQueryStatus().getAttempts() > 5) {
                    LOG.error((Object)("Failed to execute Query:" + this.getQueryId() + " for Shard:" + shardStatus.getQueryStatus().getShardId()));
                    shardStatus._queryStatus.setStatus(3);
                    ++failedCount;
                    continue;
                }
                shardStatus.getQueryStatus().setLastAttemptTime(System.currentTimeMillis());
                QuerySlaveConnection dispatchPoint = null;
                if (shardStatus.getAssignedNode() == null) {
                    String fqnHost = this._queryController.getHost().mapShardToHostFQN(this, shardStatus.getQueryStatus().getShardId());
                    if (fqnHost != null) {
                        dispatchPoint = this._queryController.getHost().mapFQNToConnection(fqnHost);
                        if (!dispatchPoint.isOnline()) {
                            dispatchPoint = null;
                        }
                        if (dispatchPoint == null) {
                            LOG.error((Object)("Failed to map fqnHost:" + fqnHost + " for shardId:" + shardStatus.getQueryStatus().getShardId() + " and Query:" + this.getQueryId() + " to valid Connection"));
                        }
                    } else {
                        LOG.error((Object)("Failed to map shardId:" + shardStatus.getQueryStatus().getShardId() + " for Query:" + this.getQueryId()));
                    }
                    if (dispatchPoint != null) {
                        shardStatus.setAssignedNode(dispatchPoint);
                    }
                }
                if (shardStatus._assignedNode == null) continue;
                LOG.info((Object)("Sending Remote Query to Slave:" + shardStatus._assignedNode.getFullyQualifiedName() + " for Shard:" + shardStatus.getQueryStatus().getShardId() + " Query:" + this.getQueryId()));
                try {
                    shardStatus._queryStatus.setStatus(1);
                    shardStatus._queryStatus.setProgress(0.0f);
                    this.dispatchQueryToSlave(shardStatus);
                }
                catch (IOException e) {
                    LOG.error((Object)("Remote RPC For Query:" + this.getQueryId() + " to Slave:" + shardStatus.getAssignedNode().getFullyQualifiedName() + " Failed with Exception:" + CCStringUtils.stringifyException((Throwable)e)));
                }
                continue;
            }
            if (shardStatus._queryStatus.getStatus() == 2) {
                totalResultCount += shardStatus._queryStatus.getOptResultCount();
                ++completedCount;
                continue;
            }
            if (shardStatus._queryStatus.getStatus() != 3) continue;
            if (shardStatus.getQueryStatus().getAttempts() < 5) {
                shardStatus._queryStatus.setStatus(0);
                LOG.info((Object)("Marking FAILED Query:" + this.getQueryId() + " for Retry. Attempts(" + shardStatus.getQueryStatus().getAttempts() + "). Last Error Was:" + shardStatus._queryStatus.getOptErrorReason()));
                continue;
            }
            failureReason = failureReason + "\n";
            failureReason = failureReason + "Failure Reason:" + shardStatus._queryStatus.getOptErrorReason();
            ++failedCount;
        }
        int shardCount = this._shardStatusMap.size();
        float queryPctComplete = (float)completedCount / (float)shardCount;
        float statusPctComplete = 0.33f * queryPctComplete;
        if (request != null) {
            request.getQueryStatus().setProgress(statusPctComplete);
        }
        if (completedCount + failedCount == this._shardStatusMap.size() || completedCount == 1 && totalResultCount == 1L && this.isSingleRequestQuery()) {
            this._associatedRequest = null;
            if (this._queryMonitorTimer != null && this._queryController != null) {
                this._queryController.getHost().getEventLoop().cancelTimer(this._queryMonitorTimer);
                this._queryMonitorTimer = null;
            }
            if (failedCount != 0) {
                LOG.info((Object)("Cancellig Query:" + this.getQueryId() + " because " + failedCount + " Shards Failed"));
                this._queryStatus.setStatus(3);
                this._queryStatus.setOptErrorReason(failureReason);
                this._completionCallback.queryFailed(this, failureReason);
            } else {
                LOG.info((Object)("Query:" + this.getQueryId() + " Successfully Completed with Result Count:" + totalResultCount));
                this._queryStatus.setStatus(2);
                this._queryStatus.setOptResultCount(totalResultCount);
                this._completionCallback.queryComplete(this, totalResultCount);
            }
            this._shardStatusMap.clear();
        }
    }

    public long getQueryId() {
        return this.getCommonQueryInfo().getQueryId();
    }

    public void setQueryId(long queryId) {
        this.getCommonQueryInfo().setQueryId(queryId);
        this._queryStatus.setQueryId(queryId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelSlaveQuery() {
        Query query = this;
        synchronized (query) {
            this._cancelQuery = true;
        }
        if (this._queryThread != null) {
            try {
                this._queryThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public void waitOnQuery() {
        if (this._queryThread != null) {
            try {
                this._queryThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    final void dispatchQueryToSlave(final ShardState shardStatus) throws IOException {
        final String targetHostFQN = shardStatus.getAssignedNode().getFullyQualifiedName();
        shardStatus.getAssignedNode().getRemoteStub().doQuery(shardStatus.getQueryInfo(), new OutgoingMessageContext.Callback<RemoteQueryInfo, QueryStatus>(){

            @Override
            public void requestComplete(OutgoingMessageContext<RemoteQueryInfo, QueryStatus> request) {
                if (request.getStatus() != MessageData.Status.Success) {
                    LOG.error((Object)("Query:" + Query.this.getQueryId() + " To Slave:" + targetHostFQN + " Failed with RPC Error:" + (Object)((Object)request.getStatus())));
                    shardStatus._queryStatus.setStatus(0);
                } else {
                    LOG.info((Object)("Query:" + Query.this.getQueryId() + " To Slave:" + targetHostFQN + " returned Status:" + ((QueryStatus)request.getOutput()).getStatus()));
                    try {
                        ((QueryStatus)request.getOutput()).setFieldClean(3);
                        ((QueryStatus)request.getOutput()).setFieldClean(4);
                        ((QueryStatus)request.getOutput()).setFieldClean(1);
                        ((QueryStatus)request.getOutput()).setFieldClean(2);
                        shardStatus.getQueryStatus().merge((QueryStatus)request.getOutput());
                    }
                    catch (CloneNotSupportedException cloneNotSupportedException) {
                        // empty catch block
                    }
                }
            }
        });
    }

    public Tuples.Pair<Long, Long> getCanonicalQueryId() {
        if (this._cachedCanonicalId == null) {
            String domainId = this.getQueryDomainId();
            String parameters = this.getUniqueQueryParameters();
            this._cachedCanonicalId = new Tuples.Pair<Long, Long>(FPGenerator.std64.fp(domainId), FPGenerator.std64.fp(parameters));
        }
        return this._cachedCanonicalId;
    }

    public abstract String getQueryDomainId();

    public abstract String getUniqueQueryParameters();

    protected boolean cachedResultsAvailable(FileSystem remoteFS, Configuration conf, ClientQueryInfo clientQueryInfo) throws IOException {
        Path outputFileName = new Path(this.getHDFSQueryResultsPath(), MERGED_RESULTS_SUFFIX);
        boolean result = remoteFS.exists(outputFileName);
        return result;
    }

    protected void executeCacheQuery(FileSystem remoteFS, Configuration conf, EventLoop eventLoop, QueryController queryController, QueryRequest<DataType, ResultKeyType, ResultValueType> theClientRequest, QueryCompletionCallback<DataType, ResultKeyType, ResultValueType> callback) throws IOException {
        LOG.info((Object)("getCachedResults called for Query:" + this.getQueryId()));
        Path outputFileName = new Path(this.getHDFSQueryResultsPath(), MERGED_RESULTS_SUFFIX);
        Path indexFileName = QueryResultFileIndex.getIndexNameFromBaseName(outputFileName);
        QueryResultFileIndex<ResultKeyType, ResultValueType> index = new QueryResultFileIndex<ResultKeyType, ResultValueType>(remoteFS, indexFileName, this.getKeyClass(), this.getValueClass());
        QueryResult resultOut = new QueryResult();
        LOG.info((Object)("getCachedResults called for Query:" + this.getQueryId() + " Calling ReadPaginationResults"));
        index.readPaginatedResults(remoteFS, conf, theClientRequest.getClientQueryInfo().getSortOrder(), theClientRequest.getClientQueryInfo().getPaginationOffset(), theClientRequest.getClientQueryInfo().getPageSize(), resultOut);
        LOG.info((Object)("getCachedResults called for Query:" + this.getQueryId() + ". Initiating getCachedResults Callback"));
        callback.queryComplete(theClientRequest, resultOut);
    }

    protected SpillWriter<ResultKeyType, ResultValueType> createAggregatedResultSpillWriter(FileSystem remoteFileSystem, Configuration conf, Path mergeResultsPath, QueryResultFileIndex.PositionBasedIndexWriter indexWriter) throws IOException {
        return new SequenceFileSpillWriter<ResultKeyType, ResultValueType>(remoteFileSystem, conf, mergeResultsPath, this.getKeyClass(), this.getValueClass(), new QueryResultFileIndex.PositionBasedIndexWriter(remoteFileSystem, QueryResultFileIndex.getIndexNameFromBaseName(mergeResultsPath)), null, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected long executeAggregationQuery(FileSystem remoteFileSystem, Configuration conf, EventLoop eventLoop, File tempFirDir, QueryRequest<DataType, ResultKeyType, ResultValueType> requestObject, final QueryProgressCallback progressCallback) throws IOException {
        Path mergeResultsPath = new Path(this.getHDFSQueryResultsPath(), MERGED_RESULTS_SUFFIX);
        try {
            LOG.info((Object)("Execute Local called for Query:" + this.getQueryId() + " MergeResultsPath is:" + mergeResultsPath));
            if (!remoteFileSystem.exists(mergeResultsPath)) {
                LOG.info((Object)("Execute Local for Query:" + this.getQueryId() + " Source MergeFile:" + mergeResultsPath + " Not Found. Checking for parts files"));
                Vector<Path> parts = new Vector<Path>();
                FileStatus[] fileStatusArray = remoteFileSystem.globStatus(new Path(this.getHDFSQueryResultsPath(), "part-*"));
                if (fileStatusArray.length == 0) {
                    LOG.error((Object)("Execute Local for Query:" + this.getQueryId() + " FAILED. No Parts Files Found!"));
                    throw new IOException("Remote Component Part Files Not Found");
                }
                for (FileStatus part : fileStatusArray) {
                    parts.add(part.getPath());
                }
                LOG.info((Object)("Execute Local for Query:" + this.getQueryId() + " Initializing Merger"));
                SpillWriter<ResultKeyType, ResultValueType> mergedFileSpillWriter = this.createAggregatedResultSpillWriter(remoteFileSystem, conf, mergeResultsPath, new QueryResultFileIndex.PositionBasedIndexWriter(remoteFileSystem, QueryResultFileIndex.getIndexNameFromBaseName(mergeResultsPath)));
                try {
                    SequenceFileMerger<ResultKeyType, ResultValueType> merger = new SequenceFileMerger<ResultKeyType, ResultValueType>(remoteFileSystem, conf, parts, mergedFileSpillWriter, this.getKeyClass(), this.getValueClass(), this.allocateRawComparator());
                    try {
                        LOG.info((Object)("Execute Local for Query:" + this.getQueryId() + " Running Merger"));
                        merger.mergeAndSpill(new Reporter(){

                            public void progress() {
                            }

                            public void setStatus(String status) {
                            }

                            public void incrCounter(String group, String counter, long amount) {
                            }

                            public void incrCounter(Enum<?> key, long amount) {
                                if (key == SequenceFileMerger.Counters.PCT_COMPLETED) {
                                    Query.this._aggregationCompletionCount += amount;
                                    if (progressCallback != null) {
                                        float pctComplete = (float)Query.this._aggregationCompletionCount / 100.0f;
                                        progressCallback.updateProgress(Query.this, pctComplete);
                                    }
                                }
                            }

                            public InputSplit getInputSplit() throws UnsupportedOperationException {
                                return null;
                            }

                            public Counters.Counter getCounter(String group, String name) {
                                return null;
                            }

                            public Counters.Counter getCounter(Enum<?> name) {
                                return null;
                            }
                        });
                        LOG.info((Object)("Execute Local for Query:" + this.getQueryId() + " Merge Successfull.. Deleting Merge Inputs"));
                        for (Path inputPath : parts) {
                            remoteFileSystem.delete(inputPath, false);
                        }
                    }
                    catch (IOException e) {
                        LOG.error((Object)("Execute Local for Query:" + this.getQueryId() + " Merge Failed with Exception:" + CCStringUtils.stringifyException((Throwable)e)));
                        throw e;
                    }
                    finally {
                        LOG.info((Object)"** CLOSING MERGER");
                        merger.close();
                    }
                }
                finally {
                    LOG.info((Object)"** FLUSHING SPILLWRITER");
                    mergedFileSpillWriter.close();
                }
            }
            Path indexFileName = QueryResultFileIndex.getIndexNameFromBaseName(mergeResultsPath);
            QueryResultFileIndex<ResultKeyType, ResultValueType> indexFile = new QueryResultFileIndex<ResultKeyType, ResultValueType>(remoteFileSystem, indexFileName, this.getKeyClass(), this.getValueClass());
            return indexFile.getRecordCount();
        }
        catch (Exception e) {
            LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
            throw new IOException(e);
        }
    }

    public boolean requiresShardedQuery(FileSystem remoteFS, Configuration conf, QueryController controller, QueryRequest<DataType, ResultKeyType, ResultValueType> theClientRequest, int shardCount, ArrayList<QueryInputSplit> splitsOut) throws IOException {
        FileStatus[] availableParts;
        HashSet<String> requiredParts = new HashSet<String>();
        for (int i = 0; i < shardCount; ++i) {
            requiredParts.add(Query.getPartNameForSlave(i));
        }
        Path remoteQueryPath = this.getHDFSQueryResultsPath();
        for (FileStatus part : availableParts = remoteFS.globStatus(new Path(remoteQueryPath, "part-*"))) {
            requiredParts.remove(part.getPath().getName());
        }
        if (requiredParts.size() != 0) {
            for (String part : requiredParts) {
                QueryInputSplit split = new QueryInputSplit();
                split.setShardId(Integer.parseInt(part.substring("part-".length())));
                splitsOut.add(split);
            }
            return true;
        }
        LOG.info((Object)"All parts required for query available.");
        return false;
    }

    public boolean isHighPriorityQuery() {
        return false;
    }

    protected abstract long executeShardedQuery(FileSystem var1, Configuration var2, EventLoop var3, QuerySlaveServer var4, File var5, Path var6, MergeSortSpillWriter<ResultKeyType, ResultValueType> var7, QueryProgressCallback<DataType, ResultKeyType, ResultValueType> var8) throws IOException;

    public void shardedQueryComplete(FileSystem fileSystem, Configuration conf, QueryRequest<DataType, ResultKeyType, ResultValueType> theClientRequest, long resultCount) throws IOException {
    }

    public void shardedQueryFailed(FileSystem fileSystem) {
    }

    public boolean isSingleRequestQuery() {
        return false;
    }

    public abstract Class<ResultKeyType> getKeyClass();

    public abstract Class<ResultValueType> getValueClass();

    public abstract RawKeyValueComparator<ResultKeyType, ResultValueType> allocateRawComparator();

    protected static synchronized String getPartNameForSlave(int slaveIndex) {
        return "part-" + NUMBER_FORMAT.format(slaveIndex);
    }

    protected Path getHDFSQueryResultsPath() {
        return new Path(this.getCommonQueryInfo().getQueryResultPath());
    }

    protected Path getHDFSQueryTempPath() {
        return new Path(this.getCommonQueryInfo().getQueryTempPath());
    }

    protected Path getHDFSQueryAttemptFilePathForShard(int shardIndex) {
        return new Path(this.getHDFSQueryTempPath(), Query.getPartNameForSlave(shardIndex) + "-" + System.currentTimeMillis());
    }

    static {
        NUMBER_FORMAT.setMinimumIntegerDigits(5);
        NUMBER_FORMAT.setGroupingUsed(false);
    }

    private class ShardState {
        private QuerySlaveConnection _assignedNode;
        private QueryStatus _queryStatus;
        private RemoteQueryInfo _queryInfo;
        public boolean _logged = false;

        public ShardState(QueryStatus queryStatus, RemoteQueryInfo queryInfo) {
            this._queryStatus = queryStatus;
            this._queryInfo = queryInfo;
        }

        public void setAssignedNode(QuerySlaveConnection node) {
            this._assignedNode = node;
        }

        public QuerySlaveConnection getAssignedNode() {
            return this._assignedNode;
        }

        public QueryStatus getQueryStatus() {
            return this._queryStatus;
        }

        public RemoteQueryInfo getQueryInfo() {
            return this._queryInfo;
        }
    }
}

