/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.producer;

import com.flipkart.aesop.runtime.producer.AbstractEventProducer;
import com.flipkart.aesop.runtime.producer.ReadEventCycleSummary;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DbusEventInfo;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import org.apache.avro.generic.GenericRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public abstract class AbstractCallbackEventProducer<S extends GenericRecord>
extends AbstractEventProducer
implements InitializingBean {
    private static final Logger LOGGER = LogFactory.getLogger(AbstractCallbackEventProducer.class);
    private static final int ACTIVE = 0;
    private static final int PAUSED = 1;
    private static final int EXIT = 2;
    protected EventProducerThread eventThread;
    protected volatile int eventThreadState = 0;
    protected DatabusComponentStatus status;

    public void afterPropertiesSet() throws Exception {
        Assert.notNull((Object)this.physicalSourceConfig, (String)"'physicalSourceConfig' cannot be null. This Event producer will not be initialized");
        this.status = new DatabusComponentStatus(String.valueOf(this.name) + ".callbackEventProducer", this.physicalSourceConfig.getRetries().build());
    }

    public void start(long sinceSCN) {
        this.eventThreadState = 0;
        this.sinceSCN.set(sinceSCN);
        this.eventThread = new EventProducerThread(this.name);
        this.eventThread.setDaemon(true);
        this.eventThread.start();
        LOGGER.info("Started callback event producer : {} from SCN ; {}", (Object)this.getName(), (Object)sinceSCN);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        AbstractCallbackEventProducer abstractCallbackEventProducer = this;
        synchronized (abstractCallbackEventProducer) {
            this.eventThreadState = 2;
            this.notifyAll();
            if (this.eventThread != null) {
                this.eventThread.interrupt();
            }
        }
    }

    public boolean isPaused() {
        return this.eventThreadState == 1;
    }

    public boolean isRunning() {
        return this.eventThread != null && this.eventThreadState != 1 && this.eventThreadState != 2;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pause() {
        AbstractCallbackEventProducer abstractCallbackEventProducer = this;
        synchronized (abstractCallbackEventProducer) {
            this.eventThreadState = 1;
            this.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unpause() {
        AbstractCallbackEventProducer abstractCallbackEventProducer = this;
        synchronized (abstractCallbackEventProducer) {
            this.eventThreadState = 0;
            this.notifyAll();
        }
    }

    public void waitForShutdown() throws InterruptedException, IllegalStateException {
        while (this.eventThread != null && this.eventThread.isAlive()) {
            this.eventThread.join();
        }
    }

    public void waitForShutdown(long time) throws InterruptedException, IllegalStateException {
        if (this.eventThread != null && this.eventThread.isAlive()) {
            this.eventThread.join(time);
        }
        if (this.eventThread != null && this.eventThread.isAlive()) {
            throw new IllegalStateException("Shutdown not successful on event producer thread for :" + this.name + " after timeout of : " + time);
        }
    }

    protected abstract ReadEventCycleSummary<S> readEventsFromAllSources(long var1) throws EventCreationException;

    protected abstract Object getEventKey(S var1);

    protected abstract Long getSequenceId(S var1);

    private class EventProducerThread
    extends Thread {
        public EventProducerThread(String producerName) {
            super("EventProducerThread_" + producerName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                EventProducerThread eventProducerThread = this;
                synchronized (eventProducerThread) {
                    switch (AbstractCallbackEventProducer.this.eventThreadState) {
                        case 1: {
                            LOGGER.info("EventProducerThread for {} is pausing because a pause was requested.", (Object)this.getName());
                            try {
                                this.wait();
                            }
                            catch (InterruptedException ex) {
                                LOGGER.info("Ignoring thread interrupt on EventProducerThread for {}", (Object)this.getName());
                            }
                            break;
                        }
                        case 2: {
                            LOGGER.info("EventProducerThread for {} is stopping because a shutdown was requested.", (Object)this.getName());
                            AbstractCallbackEventProducer.this.eventBuffer.rollbackEvents();
                            return;
                        }
                        case 0: {
                            try {
                                ReadEventCycleSummary readEventCycleSummary = AbstractCallbackEventProducer.this.readEventsFromAllSources(AbstractCallbackEventProducer.this.sinceSCN.get());
                                if (readEventCycleSummary.getChangeEvents().size() > 0) {
                                    AbstractCallbackEventProducer.this.eventBuffer.startEvents();
                                    for (GenericRecord changeEvent : readEventCycleSummary.getChangeEvents()) {
                                        byte[] schemaId = SchemaHelper.getSchemaId((String)changeEvent.getSchema().toString());
                                        byte[] serializedEvent = AbstractCallbackEventProducer.this.serializeEvent(changeEvent);
                                        DbusEventKey eventKey = new DbusEventKey(AbstractCallbackEventProducer.this.getEventKey(changeEvent));
                                        DbusEventInfo eventInfo = new DbusEventInfo(DbusOpcode.UPSERT, AbstractCallbackEventProducer.this.getSequenceId(changeEvent).longValue(), (short)AbstractCallbackEventProducer.this.physicalSourceStaticConfig.getId(), (short)AbstractCallbackEventProducer.this.physicalSourceStaticConfig.getId(), System.nanoTime(), AbstractCallbackEventProducer.this.physicalSourceStaticConfig.getSources()[0].getId(), schemaId, serializedEvent, false, true);
                                        AbstractCallbackEventProducer.this.eventBuffer.appendEvent(eventKey, eventInfo, AbstractCallbackEventProducer.this.dbusEventsStatisticsCollector);
                                    }
                                    long endOfWindowScn = readEventCycleSummary.getSinceSCN();
                                    long newSinceSCN = Math.max(endOfWindowScn, AbstractCallbackEventProducer.this.sinceSCN.get());
                                    AbstractCallbackEventProducer.this.sinceSCN.set(newSinceSCN);
                                    AbstractCallbackEventProducer.this.eventBuffer.endEvents(AbstractCallbackEventProducer.this.sinceSCN.get(), AbstractCallbackEventProducer.this.dbusEventsStatisticsCollector);
                                    AbstractCallbackEventProducer.this.maxScnReaderWriter.saveMaxScn(AbstractCallbackEventProducer.this.sinceSCN.get());
                                    LOGGER.info("Added {} change events to event buffer for : {} . New SCN is : " + newSinceSCN, (Object)readEventCycleSummary.getChangeEvents().size(), (Object)this.getName());
                                }
                                if (AbstractCallbackEventProducer.this.status.getRetriesNum() > 0) {
                                    AbstractCallbackEventProducer.this.status.resume();
                                }
                                AbstractCallbackEventProducer.this.status.getRetriesCounter().reset();
                                AbstractCallbackEventProducer.this.status.getRetriesCounter().sleep();
                                break;
                            }
                            catch (Exception e) {
                                LOGGER.error("Event creation exception occurred reading events from : {}. Error is : " + e.getMessage(), (Object)this.getName(), (Object)e);
                                AbstractCallbackEventProducer.this.status.retryOnError(String.valueOf(this.getName()) + " error: " + e.getMessage());
                            }
                        }
                    }
                }
            }
        }
    }
}

