/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.bootstrap;

import com.linkedin.databus.client.DatabusHttpClientImpl;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.SingleSourceSCN;
import com.linkedin.databus.client.bootstrap.DatabusBootstrapDummyConsumer;
import com.linkedin.databus.client.generic.ConsumerPauseRequestProcessor;
import com.linkedin.databus.client.generic.DatabusConsumerPauseInterface;
import com.linkedin.databus.client.pub.ConsumerCallbackResult;
import com.linkedin.databus.client.pub.DatabusBootstrapConsumer;
import com.linkedin.databus.client.pub.DatabusClientException;
import com.linkedin.databus.client.pub.DatabusStreamConsumer;
import com.linkedin.databus.client.pub.DbusEventDecoder;
import com.linkedin.databus.client.pub.SCN;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.FileBasedEventTrackingCallback;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectors;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.request.ProcessorRegistrationConflictException;
import com.linkedin.databus2.core.container.request.RequestProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.log4j.Logger;

public class IntegratedDummyDatabusConsumer
extends DatabusBootstrapDummyConsumer
implements DatabusStreamConsumer,
DatabusConsumerPauseInterface {
    public static final String MODULE = IntegratedDummyDatabusConsumer.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private static final int FIFTEEN_HUNDRED_KILOBYTES_IN_BYTES = 1500000;
    private static final int TEN_MEGABYTES_IN_BYTES = 10000000;
    private DatabusHttpClientImpl _dbusClient;
    private FileBasedEventTrackingCallback _fileBasedCallback;
    private long _maxEventBufferSize;
    private int _maxReadBufferSize;
    private long _maxBootstrapWindownScn;
    private long _maxRelayWindowScn;
    private boolean _isPaused;
    private boolean _useConsumerTimeout;
    private int _numUserSpecifiedErrorResults;

    public IntegratedDummyDatabusConsumer(String outputFilename, long maxEventBufferSize, int maxReadBufferSize, boolean useConsumerTimeout) {
        this._fileBasedCallback = new FileBasedEventTrackingCallback(outputFilename, false);
        this._maxEventBufferSize = maxEventBufferSize;
        this._maxReadBufferSize = maxReadBufferSize;
        this._useConsumerTimeout = useConsumerTimeout;
    }

    public IntegratedDummyDatabusConsumer(String outputFilename, long maxEventBufferSize, int maxReadBufferSize) {
        this(outputFilename, maxEventBufferSize, maxReadBufferSize, true);
    }

    public IntegratedDummyDatabusConsumer(String outputFilename) {
        this(outputFilename, 10000000L, 1500000);
    }

    @Override
    public ConsumerCallbackResult onBootstrapEvent(DbusEvent e, DbusEventDecoder eventDecoder) {
        this.waitIfPaused();
        this._fileBasedCallback.onEvent(e);
        this.printBootstrapEventInfo(DatabusBootstrapDummyConsumer.BootstrapStage.OnBootstrapEvent, e.toString());
        return this.getUserSpecifiedCallbackResult();
    }

    @Override
    public ConsumerCallbackResult onEndBootstrapSequence(SCN endScn) {
        this.waitIfPaused();
        this._maxBootstrapWindownScn = ((SingleSourceSCN)endScn).getSequence();
        this.printBootstrapEventInfo(DatabusBootstrapDummyConsumer.BootstrapStage.EndBootstrapSequence, endScn.toString());
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onCheckpoint(SCN checkpointScn) {
        this.waitIfPaused();
        this.printStreamEventInfo(StreamStage.OnCheckpointEvent, checkpointScn.toString());
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onDataEvent(DbusEvent e, DbusEventDecoder eventDecoder) {
        this.waitIfPaused();
        this._fileBasedCallback.onEvent(e);
        this.printStreamEventInfo(StreamStage.OnStreamEvent, e.toString());
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onEndDataEventSequence(SCN endScn) {
        this.waitIfPaused();
        this._maxRelayWindowScn = ((SingleSourceSCN)endScn).getSequence();
        this.printStreamEventInfo(StreamStage.EndDataEventSequence, endScn.toString());
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onEndSource(String source, Schema sourceSchema) {
        this.printStreamEventInfo(StreamStage.EndStreamSource, " source=" + source + " schema=" + (null == sourceSchema ? "null" : sourceSchema.getFullName()));
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onRollback(SCN startScn) {
        this.printStreamEventInfo(StreamStage.InvalidStage, "rollback not implemented");
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onStartDataEventSequence(SCN startScn) {
        this.printStreamEventInfo(StreamStage.StartDataEventSequence, startScn.toString());
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onStartSource(String source, Schema sourceSchema) {
        this.printStreamEventInfo(StreamStage.StartStreamSource, " source=" + source + " schema=" + (null == sourceSchema ? "null" : sourceSchema.getFullName()));
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onStartConsumption() {
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onStopConsumption() {
        return this.getUserSpecifiedCallbackResult();
    }

    public ConsumerCallbackResult onError(Throwable err) {
        return this.getUserSpecifiedCallbackResult();
    }

    public void initConn(List<String> sources) throws IOException, InvalidConfigException, DatabusClientException, DatabusException {
        StringBuilder sourcesString = new StringBuilder();
        boolean firstSrc = true;
        for (String source : sources) {
            if (!firstSrc) {
                sourcesString.append(",");
            }
            firstSrc = false;
            sourcesString.append(source);
        }
        this._fileBasedCallback.init();
        ArrayList<IntegratedDummyDatabusConsumer> bootstrapCallbacks = new ArrayList<IntegratedDummyDatabusConsumer>();
        bootstrapCallbacks.add(this);
        ArrayList<IntegratedDummyDatabusConsumer> streamCallbacks = new ArrayList<IntegratedDummyDatabusConsumer>();
        streamCallbacks.add(this);
        DatabusHttpClientImpl.Config clientConfigBuilder = new DatabusHttpClientImpl.Config();
        clientConfigBuilder.getContainer().getJmx().setJmxServicePort(5555);
        clientConfigBuilder.getContainer().setId(545454);
        clientConfigBuilder.getContainer().setHttpPort(8082);
        clientConfigBuilder.getCheckpointPersistence().setType(DatabusHttpClientImpl.CheckpointPersistenceStaticConfig.ProviderType.FILE_SYSTEM.toString());
        clientConfigBuilder.getCheckpointPersistence().getFileSystem().setRootDirectory("./integratedconsumer-checkpoints");
        clientConfigBuilder.getCheckpointPersistence().setClearBeforeUse(true);
        clientConfigBuilder.getRuntime().getBootstrap().setEnabled(true);
        DatabusSourcesConnection.Config srcDefaultConfig = new DatabusSourcesConnection.Config();
        srcDefaultConfig.setFreeBufferThreshold((int)((double)this._maxEventBufferSize * 0.05));
        srcDefaultConfig.setCheckpointThresholdPct(80.0);
        srcDefaultConfig.setConsumerTimeBudgetMs(this._useConsumerTimeout ? 60000L : 0L);
        srcDefaultConfig.getDispatcherRetries().setMaxRetryNum(3);
        clientConfigBuilder.setConnectionDefaults(srcDefaultConfig);
        DbusEventBuffer.Config eventBufferConfig = clientConfigBuilder.getConnectionDefaults().getEventBuffer();
        eventBufferConfig.setMaxSize(this._maxEventBufferSize);
        eventBufferConfig.setAverageEventSize(this._maxReadBufferSize);
        DatabusHttpClientImpl.StaticConfig clientConfig = clientConfigBuilder.build();
        ServerInfo.ServerInfoBuilder relayBuilder = clientConfig.getRuntime().getRelay("1");
        relayBuilder.setName("DefaultRelay");
        relayBuilder.setHost("localhost");
        relayBuilder.setPort(9000);
        relayBuilder.setSources(sourcesString.toString());
        ServerInfo.ServerInfoBuilder bootstrapBuilder = clientConfig.getRuntime().getBootstrap().getService("2");
        bootstrapBuilder.setName("DefaultBootstrapServices");
        bootstrapBuilder.setHost("localhost");
        bootstrapBuilder.setPort(6060);
        bootstrapBuilder.setSources(sourcesString.toString());
        this._dbusClient = new DatabusHttpClientImpl(clientConfig);
        this._dbusClient.registerDatabusStreamListener((DatabusStreamConsumer)this, sources, null);
        this._dbusClient.registerDatabusBootstrapListener((DatabusBootstrapConsumer)this, sources, null);
        try {
            this._dbusClient.getProcessorRegistry().register("pauseConsumer", (RequestProcessor)new ConsumerPauseRequestProcessor(null, (DatabusConsumerPauseInterface)this));
        }
        catch (ProcessorRegistrationConflictException e) {
            LOG.error((Object)"Failed to register pauseConsumer");
        }
    }

    protected void printStreamEventInfo(StreamStage stage, String info) {
        LOG.debug((Object)((Object)((Object)stage) + ": " + info));
    }

    public synchronized void start() throws Exception {
        this._isPaused = false;
        this._dbusClient.start();
        LOG.info((Object)(MODULE + " started!"));
    }

    public synchronized void shutdown() {
        this._isPaused = false;
        this._dbusClient.shutdown();
    }

    public long getMaxBootstrapWindowScn() {
        return this._maxBootstrapWindownScn;
    }

    public long getMaxRelayWindowScn() {
        return this._maxRelayWindowScn;
    }

    public StatsCollectors<UnifiedClientStats> getUnifiedClientStatsCollectors() {
        return this._dbusClient.getUnifiedClientStatsCollectors();
    }

    public List<DatabusSourcesConnection> getRelayConnections() {
        return this._dbusClient.getRelayConnections();
    }

    public synchronized void pause() {
        this._isPaused = true;
        LOG.info((Object)"Consumer is set to pause!");
    }

    public synchronized void resume() {
        this._isPaused = false;
        ((Object)((Object)this)).notifyAll();
        LOG.info((Object)"Consumer is set to resume!");
    }

    public synchronized void waitIfPaused() {
        while (this._isPaused) {
            LOG.info((Object)"Consumer is paused!");
            try {
                ((Object)((Object)this)).wait();
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public synchronized void setNumUserSpecifiedErrorResults(int numErrorResults) {
        LOG.info((Object)("Will return " + numErrorResults + " ConsumerCallbackResult.ERROR as requested."));
        this._numUserSpecifiedErrorResults = numErrorResults;
    }

    protected synchronized ConsumerCallbackResult getUserSpecifiedCallbackResult() {
        if (this._numUserSpecifiedErrorResults > 0) {
            --this._numUserSpecifiedErrorResults;
            if (this._numUserSpecifiedErrorResults > 0) {
                LOG.info((Object)("Returning ConsumerCallbackResult.ERROR as requested; still have " + this._numUserSpecifiedErrorResults + " more to go."));
            } else {
                LOG.info((Object)"Returning ConsumerCallbackResult.ERROR as requested (last one).");
            }
            return ConsumerCallbackResult.ERROR;
        }
        return ConsumerCallbackResult.SUCCESS;
    }

    public static void main(String[] args) throws Exception {
        IntegratedDummyDatabusConsumer consumer = new IntegratedDummyDatabusConsumer("IntegratedDummyDatabusConsumerMain");
        ArrayList<String> sources = new ArrayList<String>();
        sources.add("source1");
        consumer.initConn(sources);
        consumer.start();
        consumer.shutdown();
    }

    static enum StreamStage {
        StartDataEventSequence,
        EndDataEventSequence,
        OnStreamEvent,
        OnCheckpointEvent,
        StartStreamSource,
        EndStreamSource,
        InvalidStage;

    }
}

