/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.consumer;

import com.linkedin.databus.client.consumer.AbstractDatabusCombinedConsumer;
import com.linkedin.databus.client.pub.ConsumerCallbackResult;
import com.linkedin.databus.client.pub.DbusEventDecoder;
import com.linkedin.databus.client.pub.SCN;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.InvalidConfigException;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.log4j.Logger;

public abstract class BatchingDatabusCombinedConsumer<T extends SpecificRecord>
extends AbstractDatabusCombinedConsumer {
    public static final String MODULE = BatchingDatabusCombinedConsumer.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final StaticConfig _staticConfig;
    private final Class<T> _payloadClass;
    private final ArrayList<T> _streamEvents;
    private final ArrayList<T> _bootstrapEvents;

    public BatchingDatabusCombinedConsumer(Config config, Class<T> payloadClass) throws InvalidConfigException {
        this(config.build(), payloadClass);
    }

    public BatchingDatabusCombinedConsumer(StaticConfig config, Class<T> payloadClass) {
        this._staticConfig = config;
        this._payloadClass = payloadClass;
        this._streamEvents = new ArrayList(config.getStreamBatchSize());
        this._bootstrapEvents = new ArrayList(config.getBootstrapBatchSize());
    }

    @Override
    public ConsumerCallbackResult onStopConsumption() {
        ConsumerCallbackResult result = this.flushStreamEvents();
        return result;
    }

    @Override
    public ConsumerCallbackResult onEndDataEventSequence(SCN endScn) {
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        switch (this._staticConfig.getBatchingLevel()) {
            case SOURCES: 
            case WINDOWS: {
                result = this.flushStreamEvents();
                break;
            }
            case FULL: {
                result = ConsumerCallbackResult.SUCCESS;
            }
        }
        return result;
    }

    @Override
    public ConsumerCallbackResult onRollback(SCN rollbackScn) {
        ConsumerCallbackResult result = this.flushStreamEvents();
        return result;
    }

    @Override
    public ConsumerCallbackResult onEndSource(String source, Schema sourceSchema) {
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        switch (this._staticConfig.getBatchingLevel()) {
            case SOURCES: {
                result = this.flushStreamEvents();
                break;
            }
            case WINDOWS: 
            case FULL: {
                result = ConsumerCallbackResult.SUCCESS;
            }
        }
        return result;
    }

    @Override
    public ConsumerCallbackResult onCheckpoint(SCN checkpointScn) {
        ConsumerCallbackResult result = this.flushStreamEvents();
        return result;
    }

    @Override
    public ConsumerCallbackResult onStopBootstrap() {
        ConsumerCallbackResult result = this.flushBootstrapEvents();
        return result;
    }

    @Override
    public ConsumerCallbackResult onEndBootstrapSequence(SCN endScn) {
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        switch (this._staticConfig.getBatchingLevel()) {
            case SOURCES: 
            case WINDOWS: {
                result = this.flushBootstrapEvents();
                break;
            }
            case FULL: {
                result = ConsumerCallbackResult.SUCCESS;
            }
        }
        return result;
    }

    @Override
    public ConsumerCallbackResult onEndBootstrapSource(String name, Schema sourceSchema) {
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        switch (this._staticConfig.getBatchingLevel()) {
            case SOURCES: {
                result = this.flushBootstrapEvents();
                break;
            }
            case WINDOWS: 
            case FULL: {
                result = ConsumerCallbackResult.SUCCESS;
            }
        }
        return result;
    }

    @Override
    public ConsumerCallbackResult onBootstrapEvent(DbusEvent e, DbusEventDecoder eventDecoder) {
        SpecificRecord eventObj = eventDecoder.getTypedValue(e, (SpecificRecord)null, this._payloadClass);
        ConsumerCallbackResult result = this.addBootstrapEvent(eventObj);
        return result;
    }

    @Override
    public ConsumerCallbackResult onBootstrapRollback(SCN batchCheckpointScn) {
        ConsumerCallbackResult result = this.flushBootstrapEvents();
        return result;
    }

    @Override
    public ConsumerCallbackResult onBootstrapCheckpoint(SCN batchCheckpointScn) {
        ConsumerCallbackResult result = this.flushBootstrapEvents();
        return result;
    }

    @Override
    public ConsumerCallbackResult onDataEvent(DbusEvent e, DbusEventDecoder eventDecoder) {
        SpecificRecord eventObj = eventDecoder.getTypedValue(e, (SpecificRecord)null, this._payloadClass);
        ConsumerCallbackResult result = this.addDataEvent(eventObj);
        return result;
    }

    private ConsumerCallbackResult addDataEvent(T eventObj) {
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        this._streamEvents.add(eventObj);
        if (this._streamEvents.size() >= this._staticConfig.getStreamBatchSize()) {
            result = this.flushStreamEvents();
        }
        return result;
    }

    private ConsumerCallbackResult addBootstrapEvent(T eventObj) {
        ConsumerCallbackResult result = ConsumerCallbackResult.SUCCESS;
        this._bootstrapEvents.add(eventObj);
        if (this._bootstrapEvents.size() >= this._staticConfig.getBootstrapBatchSize()) {
            result = this.flushBootstrapEvents();
        }
        return result;
    }

    private ConsumerCallbackResult flushStreamEvents() {
        ConsumerCallbackResult result;
        if (0 == this._streamEvents.size()) {
            result = ConsumerCallbackResult.SUCCESS;
        } else {
            result = this.onDataEventsBatch(this._streamEvents);
            this._streamEvents.clear();
        }
        return result;
    }

    private ConsumerCallbackResult flushBootstrapEvents() {
        ConsumerCallbackResult result;
        if (0 == this._bootstrapEvents.size()) {
            result = ConsumerCallbackResult.SUCCESS;
        } else {
            result = this.onBootstrapEventsBatch(this._bootstrapEvents);
            this._bootstrapEvents.clear();
        }
        return result;
    }

    protected abstract ConsumerCallbackResult onDataEventsBatch(List<T> var1);

    protected abstract ConsumerCallbackResult onBootstrapEventsBatch(List<T> var1);

    public StaticConfig getStaticConfig() {
        return this._staticConfig;
    }

    public static class Config
    implements ConfigBuilder<StaticConfig> {
        private int _streamBatchSize = 10;
        private int _bootstrapBatchSize = 20;
        private String _batchingLevel = StaticConfig.BatchingLevel.SOURCES.toString();

        public StaticConfig build() throws InvalidConfigException {
            StaticConfig.BatchingLevel batchingLevel = null;
            try {
                batchingLevel = StaticConfig.BatchingLevel.valueOf(this._batchingLevel);
            }
            catch (Exception e) {
                throw new InvalidConfigException("invalid batchingLevel:" + this._batchingLevel);
            }
            if (this._streamBatchSize <= 0) {
                throw new InvalidConfigException("invalid streamBatchSize:" + this._streamBatchSize);
            }
            if (this._bootstrapBatchSize <= 0) {
                throw new InvalidConfigException("invalid bootstrapBatchSize:" + this._bootstrapBatchSize);
            }
            StaticConfig newConfig = new StaticConfig(batchingLevel, this._streamBatchSize, this._bootstrapBatchSize);
            LOG.info((Object)(BatchingDatabusCombinedConsumer.class.getSimpleName() + ".Config:" + newConfig));
            return newConfig;
        }

        public int getStreamBatchSize() {
            return this._streamBatchSize;
        }

        public void setStreamBatchSize(int streamBatchSize) {
            this._streamBatchSize = streamBatchSize;
        }

        public int getBootstrapBatchSize() {
            return this._bootstrapBatchSize;
        }

        public void setBootstrapBatchSize(int bootstrapBatchSize) {
            this._bootstrapBatchSize = bootstrapBatchSize;
        }

        public String getBatchingLevel() {
            return this._batchingLevel;
        }

        public void setBatchingLevel(String batchingLevel) {
            this._batchingLevel = batchingLevel;
        }
    }

    public static class StaticConfig {
        private final int _streamBatchSize;
        private final int _bootstrapBatchSize;
        private final BatchingLevel _batchingLevel;

        public StaticConfig(BatchingLevel batchingLevel, int streamBatchSize, int bootstrapBatchSize) {
            this._batchingLevel = batchingLevel;
            this._streamBatchSize = streamBatchSize;
            this._bootstrapBatchSize = bootstrapBatchSize;
        }

        public int getStreamBatchSize() {
            return this._streamBatchSize;
        }

        public int getBootstrapBatchSize() {
            return this._bootstrapBatchSize;
        }

        public BatchingLevel getBatchingLevel() {
            return this._batchingLevel;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String toString() {
            StringBuilder resBuilder = new StringBuilder(100);
            Formatter fmt = new Formatter(resBuilder);
            try {
                fmt.format("{\"batchingLevel\":\"%s\",\"streamBatchSize\":%d,\"bootstrapBatchSize\":%d}", this._batchingLevel.toString(), this._streamBatchSize, this._bootstrapBatchSize);
                fmt.flush();
                String string = fmt.toString();
                return string;
            }
            finally {
                fmt.close();
            }
        }

        public static enum BatchingLevel {
            SOURCES,
            WINDOWS,
            FULL;

        }
    }
}

