/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.consumer;

import com.linkedin.databus.client.consumer.ConsumerCallable;
import com.linkedin.databus.client.consumer.ConsumerCallbackFactory;
import com.linkedin.databus.client.consumer.DatabusV2ConsumerRegistration;
import com.linkedin.databus.client.consumer.LoggingConsumer;
import com.linkedin.databus.client.consumer.StartDataEventSequenceCallable;
import com.linkedin.databus.client.consumer.StartSourceCallable;
import com.linkedin.databus.client.pub.ConsumerCallbackResult;
import com.linkedin.databus.client.pub.DatabusCombinedConsumer;
import com.linkedin.databus.client.pub.DatabusStreamConsumer;
import com.linkedin.databus.client.pub.DbusEventDecoder;
import com.linkedin.databus.client.pub.SCN;
import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStats;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.util.IdNamePair;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.Schema;
import org.apache.log4j.Logger;

public class MultiConsumerCallback
implements DatabusStreamConsumer {
    public final String MODULE = MultiConsumerCallback.class.getName();
    public Logger _log;
    private final List<DatabusV2ConsumerRegistration> _registrations;
    private final ExecutorService _executorService;
    private final List<ConsumerCallable<ConsumerCallbackResult>> _currentBatch;
    private final ConsumerCallbackFactory<DatabusCombinedConsumer> _callbackFactory;
    private final long _timeBudgetNanos;
    private Map<Long, IdNamePair> _sourceMap;
    private long _runCallsCounter;
    private final PriorityQueue<TimestampedFuture<ConsumerCallbackResult>> _submittedCalls;
    private final Lock _lock = new ReentrantLock();
    private final LoggingConsumer _loggingConsumer;
    private final ConsumerCallbackStats _consumerStats;
    private final UnifiedClientStats _unifiedClientStats;

    public MultiConsumerCallback(List<DatabusV2ConsumerRegistration> registrations, ExecutorService executorService, long timeBudgetMs, ConsumerCallbackFactory<DatabusCombinedConsumer> callbackFactory) {
        this(registrations, executorService, timeBudgetMs, callbackFactory, null, null, null, null);
    }

    public MultiConsumerCallback(List<DatabusV2ConsumerRegistration> registrations, ExecutorService executorService, long timeBudgetMs, ConsumerCallbackFactory<DatabusCombinedConsumer> callbackFactory, ConsumerCallbackStats consumerStats, UnifiedClientStats unifiedClientStats, LoggingConsumer loggingConsumer, Logger log) {
        this._registrations = registrations;
        this._executorService = executorService;
        this._timeBudgetNanos = timeBudgetMs * 1000000L;
        this._currentBatch = new ArrayList<ConsumerCallable<ConsumerCallbackResult>>(2048);
        this._callbackFactory = callbackFactory;
        this._runCallsCounter = 0L;
        this._submittedCalls = new PriorityQueue(100, new TimestampedFutureComparator());
        this._consumerStats = consumerStats;
        this._unifiedClientStats = unifiedClientStats;
        this._loggingConsumer = loggingConsumer;
        this._log = null != log ? log : Logger.getLogger((String)this.MODULE);
    }

    private ConsumerCallbackResult submitBatch(long curNanos, boolean barrierBefore, boolean barrierAfter) {
        ++this._runCallsCounter;
        ConsumerCallbackResult retValue = ConsumerCallbackResult.SUCCESS;
        if (0L >= curNanos) {
            curNanos = System.nanoTime();
        }
        try {
            if (barrierBefore) {
                retValue = this.flushCallQueue(curNanos);
            }
            if (ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)retValue)) {
                String batchName = this._currentBatch.size() > 0 ? this._currentBatch.get(0).getClass().getSimpleName() : "";
                for (ConsumerCallable<ConsumerCallbackResult> call : this._currentBatch) {
                    Future<ConsumerCallbackResult> future = this._executorService.submit(call);
                    this._submittedCalls.add(new TimestampedFuture<ConsumerCallbackResult>(call, future, batchName, ++this._runCallsCounter));
                }
            }
            this._currentBatch.clear();
            if (ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)retValue)) {
                ConsumerCallbackResult retValue2 = barrierAfter ? this.flushCallQueue(curNanos) : this.cleanUpCallQueue(curNanos);
                retValue = ConsumerCallbackResult.max((ConsumerCallbackResult)retValue, (ConsumerCallbackResult)retValue2);
            }
        }
        catch (RuntimeException e) {
            this._log.error((Object)("internal callback error: " + e.getMessage()), (Throwable)e);
            retValue = ConsumerCallbackResult.ERROR;
        }
        return retValue;
    }

    private ConsumerCallbackResult cleanUpCallQueue(long curNanos) {
        long timeoutNanos;
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        TimestampedFuture<ConsumerCallbackResult> top = null;
        if (0L > curNanos) {
            curNanos = System.nanoTime();
        }
        long l = timeoutNanos = this._timeBudgetNanos > 0L ? curNanos - this._timeBudgetNanos : 0L;
        while ((top = this._submittedCalls.peek()) != null && (timeoutNanos >= top.getTimestamp() || top.getFuture().isDone())) {
            ConsumerCallbackResult callRes = null;
            if (top.getFuture().isDone()) {
                callRes = this.getCallResult(top.getFuture(), top.getCallType(), -1L);
            } else {
                callRes = ConsumerCallbackResult.ERROR;
                top.getFuture().cancel(true);
                this._log.error((Object)("callback timeout: " + top.getCallType() + "; runtime = " + (top.getTimestamp() - curNanos) / 1000000L + " ms; try increasing client.connectionDefaults.consumerTimeBudgetMs"));
            }
            result = ConsumerCallbackResult.max((ConsumerCallbackResult)result, (ConsumerCallbackResult)callRes);
            if (ConsumerCallbackResult.isFailure((ConsumerCallbackResult)result)) {
                this._log.error((Object)("error detected; cancelling all " + this._submittedCalls.size() + " outstanding callbacks "));
                this.cancelCalls();
            }
            this.dequeueTopFuture(result);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dequeueTopFuture(ConsumerCallbackResult result) {
        this._lock.lock();
        try {
            TimestampedFuture<ConsumerCallbackResult> future = this._submittedCalls.poll();
            if (null == future) {
                return;
            }
            ConsumerCallable<ConsumerCallbackResult> callable = future.getCallable();
            callable.endCall(result);
        }
        finally {
            this._lock.unlock();
        }
    }

    private ConsumerCallbackResult getCallResult(Future<ConsumerCallbackResult> future, String callType, long timeoutNanos) {
        try {
            ConsumerCallbackResult result;
            if (timeoutNanos == 0L) {
                this._log.error((Object)("Exhausted time budget of " + this._timeBudgetNanos / 1000000L + "ms. Skipping remaining callbacks of type " + callType));
                throw new TimeoutException("No time remaining in a timeout budget of " + this._timeBudgetNanos / 1000000L + " ms");
            }
            ConsumerCallbackResult consumerCallbackResult = result = timeoutNanos < 0L ? future.get() : future.get(timeoutNanos, TimeUnit.NANOSECONDS);
            if (result == null) {
                result = ConsumerCallbackResult.ERROR;
                this._log.error((Object)("Client application callback (" + callType + ") returned null"));
            } else if (!ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)result)) {
                this._log.error((Object)("Client application callback (" + callType + ") returned error:" + result));
            }
            return result;
        }
        catch (ExecutionException ee) {
            this._log.error((Object)("Uncaught exception in client application callback (" + callType + "): " + ee.getCause().getCause()), ee.getCause());
        }
        catch (InterruptedException ee) {
            this._log.warn((Object)("Client application callback (" + callType + ") interrupted"));
        }
        catch (TimeoutException te) {
            this._log.error((Object)("Client application timed out handling callback: " + callType + "; Try increasing client.connectionDefaults.consumerTimeBudgetMs " + " or client.connectionDefaults.bstConsumerTimeBudgetMs"));
        }
        return ConsumerCallbackResult.ERROR;
    }

    private void cancelCalls() {
        TimestampedFuture<ConsumerCallbackResult> top = null;
        while ((top = this._submittedCalls.poll()) != null) {
            try {
                if (!top.getFuture().isDone()) {
                    top.getFuture().cancel(true);
                }
                top.getCallable().endCall(ConsumerCallbackResult.ERROR);
            }
            catch (RuntimeException e) {
                this._log.error((Object)("unable to cancel call: " + top.getCallType() + ": " + e.getMessage()), (Throwable)e);
            }
        }
    }

    public ConsumerCallbackResult flushCallQueue(long curTime) {
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        if (0L >= curTime) {
            curTime = System.nanoTime();
        }
        TimestampedFuture<ConsumerCallbackResult> top = null;
        while (this._submittedCalls.size() > 0) {
            top = this._submittedCalls.peek();
            ConsumerCallbackResult topResult = null;
            Future<ConsumerCallbackResult> topFuture = top.getFuture();
            if (!topFuture.isDone()) {
                if (0L >= curTime) {
                    curTime = System.nanoTime();
                }
                long calcTimeout = this.getEstimatedTimeout(this._timeBudgetNanos, curTime, top);
                long timeoutNanos = this._timeBudgetNanos > 0L ? (calcTimeout > 0L ? calcTimeout : 0L) : -1L;
                topResult = this.getCallResult(topFuture, top.getCallType(), timeoutNanos);
                curTime = -1L;
            }
            if (topFuture.isDone() && null == topResult) {
                topResult = this.getCallResult(topFuture, top.getCallType(), -1L);
            }
            if (null == topResult) {
                topResult = ConsumerCallbackResult.ERROR;
            }
            if (ConsumerCallbackResult.isFailure((ConsumerCallbackResult)(result = ConsumerCallbackResult.max((ConsumerCallbackResult)result, (ConsumerCallbackResult)topResult)))) {
                this._log.error((Object)("error detected; cancelling all " + this._submittedCalls.size() + " outstanding callbacks"));
                this.cancelCalls();
            }
            if (topFuture.isDone() && result != ConsumerCallbackResult.ERROR) {
                ConsumerCallable tf;
                long runTime;
                boolean debugEnabled = this._log.isDebugEnabled();
                if (top.getCallType().equals("StartSourceCallable")) {
                    runTime = top.getCallable().getNanoRunTime() / 1000000L;
                    if (debugEnabled) {
                        tf = (StartSourceCallable)top.getCallable();
                        this._log.debug((Object)("StartSourceCallable time taken for source " + ((StartSourceCallable)tf).getSource() + " = " + runTime));
                    }
                } else if (top.getCallType().equals("StartDataEventSequenceCallable")) {
                    runTime = top.getCallable().getNanoRunTime() / 1000000L;
                    if (debugEnabled) {
                        tf = (StartDataEventSequenceCallable)top.getCallable();
                        this._log.debug((Object)("StartDataEventSequenceCallable time taken for source " + ((StartDataEventSequenceCallable)tf).getSCN() + " = " + runTime));
                    }
                }
            }
            this.dequeueTopFuture(result);
        }
        return result;
    }

    public ConsumerCallbackResult onCheckpoint(SCN checkpointScn) {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> checkpointCallable = this._callbackFactory.createCheckpointCallable(curNanos, checkpointScn, consumer, true);
                this._currentBatch.add(checkpointCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerEventsReceived(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> checkpointCallable = this._callbackFactory.createCheckpointCallable(curNanos, checkpointScn, this._loggingConsumer, false);
            this._currentBatch.add(checkpointCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onCheckpoint = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, true, true);
    }

    public ConsumerCallbackResult onDataEvent(DbusEvent e, DbusEventDecoder eventDecoder) {
        boolean debugEnabled = this._log.isDebugEnabled();
        long curNanos = System.nanoTime();
        if (null == this._sourceMap) {
            this._log.error((Object)"No sources map specified");
            if (this._consumerStats != null) {
                this._consumerStats.registerSrcErrors();
            }
            return ConsumerCallbackResult.ERROR;
        }
        long srcid = e.srcId();
        short lPartitionId = e.logicalPartitionId();
        IdNamePair eventSource = this._sourceMap.get(srcid);
        if (null == eventSource) {
            this._log.error((Object)"Unknown source");
            if (this._consumerStats != null) {
                this._consumerStats.registerSrcErrors();
            }
            return ConsumerCallbackResult.ERROR;
        }
        for (DatabusV2ConsumerRegistration reg : this._registrations) {
            DatabusSubscription eventSourceName = DatabusSubscription.createSubscription((IdNamePair)eventSource, (short)lPartitionId);
            if (debugEnabled) {
                this._log.debug((Object)("event source=" + eventSource + " lpart=" + lPartitionId));
            }
            if (!reg.checkSourceSubscription(eventSourceName)) continue;
            if (debugEnabled) {
                this._log.debug((Object)("consumer matches:" + reg.getConsumer()));
            }
            ConsumerCallable<ConsumerCallbackResult> dataEventCallable = this._callbackFactory.createDataEventCallable(curNanos, e, eventDecoder, reg.getConsumer(), true);
            this._currentBatch.add(dataEventCallable);
            if (this._consumerStats != null) {
                this._consumerStats.registerDataEventReceived(e);
            }
            if (this._unifiedClientStats == null) continue;
            this._unifiedClientStats.registerDataEventReceived(e);
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> dataEventCallable = this._callbackFactory.createDataEventCallable(curNanos, e, eventDecoder, this._loggingConsumer, false);
            this._currentBatch.add(dataEventCallable);
        }
        if (debugEnabled) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onDataEvent = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, false, false);
    }

    public ConsumerCallbackResult onEndDataEventSequence(SCN endScn) {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> endWindowCallable = this._callbackFactory.createEndDataEventSequenceCallable(curNanos, endScn, consumer, true);
                this._currentBatch.add(endWindowCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerSystemEventReceived();
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> endWindowCallable = this._callbackFactory.createEndDataEventSequenceCallable(curNanos, endScn, this._loggingConsumer, false);
            this._currentBatch.add(endWindowCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onEndDataEventSequence = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, true, true);
    }

    public ConsumerCallbackResult onEndSource(String source, Schema sourceSchema) {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> endSourceCallable = this._callbackFactory.createEndSourceCallable(curNanos, source, sourceSchema, consumer, true);
                this._currentBatch.add(endSourceCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerEventsReceived(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> endSourceCallable = this._callbackFactory.createEndSourceCallable(curNanos, source, sourceSchema, this._loggingConsumer, false);
            this._currentBatch.add(endSourceCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onEndSource = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, true, true);
    }

    public ConsumerCallbackResult onRollback(SCN startScn) {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> rollbackCallable = this._callbackFactory.createRollbackCallable(curNanos, startScn, consumer, true);
                this._currentBatch.add(rollbackCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerEventsReceived(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> rollbackCallable = this._callbackFactory.createRollbackCallable(curNanos, startScn, this._loggingConsumer, false);
            this._currentBatch.add(rollbackCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onRollback = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, true, true);
    }

    public ConsumerCallbackResult onStartDataEventSequence(SCN startScn) {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> startWindowCallable = this._callbackFactory.createStartDataEventSequenceCallable(curNanos, startScn, consumer, true);
                this._currentBatch.add(startWindowCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerEventsReceived(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> startWindowCallable = this._callbackFactory.createStartDataEventSequenceCallable(curNanos, startScn, this._loggingConsumer, false);
            this._currentBatch.add(startWindowCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onStartDataEventSequence = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, true, true);
    }

    public ConsumerCallbackResult onStartSource(String source, Schema sourceSchema) {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> startSourceCallable = this._callbackFactory.createStartSourceCallable(curNanos, source, sourceSchema, consumer, true);
                this._currentBatch.add(startSourceCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerEventsReceived(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> startSourceCallable = this._callbackFactory.createStartSourceCallable(curNanos, source, sourceSchema, this._loggingConsumer, false);
            this._currentBatch.add(startSourceCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onStartSource = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, false, true);
    }

    public ConsumerCallbackResult onStartConsumption() {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> startConsumptionCallable = this._callbackFactory.createStartConsumptionCallable(curNanos, consumer, true);
                this._currentBatch.add(startConsumptionCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerEventsReceived(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> startConsumptionCallable = this._callbackFactory.createStartConsumptionCallable(curNanos, this._loggingConsumer, false);
            this._currentBatch.add(startConsumptionCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onStartConsumption = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, false, true);
    }

    public ConsumerCallbackResult onStopConsumption() {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> endConsumptionCallable = this._callbackFactory.createEndConsumptionCallable(curNanos, consumer, true);
                this._currentBatch.add(endConsumptionCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerEventsReceived(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> endConsumptionCallable = this._callbackFactory.createEndConsumptionCallable(curNanos, this._loggingConsumer, false);
            this._currentBatch.add(endConsumptionCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onStopConsumption = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, true, true);
    }

    public void setSourceMap(Map<Long, IdNamePair> sourceMap) {
        this._sourceMap = sourceMap;
    }

    public Map<Long, IdNamePair> getSourceMap() {
        return this._sourceMap;
    }

    public ConsumerCallbackResult onError(Throwable err) {
        long curNanos = System.nanoTime();
        for (DatabusV2ConsumerRegistration consumerReg : this._registrations) {
            for (DatabusCombinedConsumer consumer : consumerReg.getConsumers()) {
                ConsumerCallable<ConsumerCallbackResult> onErrorCallable = this._callbackFactory.createOnErrorCallable(curNanos, err, consumer, true);
                this._currentBatch.add(onErrorCallable);
                if (this._consumerStats == null) continue;
                this._consumerStats.registerErrorEventsProcessed(1);
            }
        }
        if (this._loggingConsumer != null) {
            ConsumerCallable<ConsumerCallbackResult> onErrorCallable = this._callbackFactory.createOnErrorCallable(curNanos, err, this._loggingConsumer, false);
            this._currentBatch.add(onErrorCallable);
        }
        if (this._log.isDebugEnabled()) {
            long endNanos = System.nanoTime();
            this._log.debug((Object)("Time spent in databus clientlib by onError = " + (endNanos - curNanos) / 1000000L + "ms"));
        }
        return this.submitBatch(curNanos, true, true);
    }

    public ConsumerCallbackStats getStats() {
        return this._consumerStats;
    }

    public void removeRegistration(DatabusV2ConsumerRegistration reg) {
        this._registrations.remove(reg);
    }

    protected long getEstimatedTimeout(long timeBudget, long curTime, TimestampedFuture<ConsumerCallbackResult> top) {
        return timeBudget - (curTime - top.getTimestamp());
    }

    static class TimestampedFutureComparator<T>
    implements Comparator<TimestampedFuture<T>>,
    Serializable {
        private static final long serialVersionUID = 1L;

        TimestampedFutureComparator() {
        }

        @Override
        public int compare(TimestampedFuture<T> o1, TimestampedFuture<T> o2) {
            return (int)(o1.getTimestamp() - o2.getTimestamp());
        }
    }

    static class TimestampedFuture<T> {
        private final Future<T> _future;
        private final String _callType;
        private final long _callNum;
        private final ConsumerCallable<T> _callable;

        public TimestampedFuture(ConsumerCallable<T> callable, Future<T> future, String callType, long callNum) {
            this._future = future;
            this._callType = callType;
            this._callNum = callNum;
            this._callable = callable;
        }

        public Future<T> getFuture() {
            return this._future;
        }

        public long getTimestamp() {
            return this._callable.getCreationTime();
        }

        public long getCallNum() {
            return this._callNum;
        }

        public String getCallType() {
            return this._callType;
        }

        public ConsumerCallable<T> getCallable() {
            return this._callable;
        }
    }
}

