/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.relay.sample;

import com.flipkart.aesop.events.sample.person.FieldChange;
import com.flipkart.aesop.events.sample.person.Person;
import com.flipkart.aesop.runtime.producer.AbstractEventProducer;
import com.linkedin.databus.core.DbusEventInfo;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class PersonEventProducer
extends AbstractEventProducer {
    private static final Logger LOGGER = LogFactory.getLogger(PersonEventProducer.class);
    private static final int NUM_EVENTS = 100;
    private int numberOfEventsPerRun = 100;
    private volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false);

    public String getName() {
        return "PersonEventProducer";
    }

    public void start(long sinceSCN) {
        this.shutdownRequested.set(false);
        this.sinceSCN.set(sinceSCN);
        EventProducerThread thread = new EventProducerThread();
        thread.start();
    }

    public long getSCN() {
        return this.sinceSCN.get();
    }

    public boolean isPaused() {
        return false;
    }

    public boolean isRunning() {
        return true;
    }

    public void pause() {
    }

    public void shutdown() {
        LOGGER.info("Shutdown has been requested. PersonEventProducer shutttng down");
        this.shutdownRequested.set(true);
        super.shutdown();
        LOGGER.info("PersonEventProducer shutdown completed");
    }

    public void unpause() {
    }

    public void waitForShutdown() throws InterruptedException, IllegalStateException {
    }

    public void waitForShutdown(long time) throws InterruptedException, IllegalStateException {
    }

    public void setNumberOfEventsPerRun(int numberOfEventsPerRun) {
        this.numberOfEventsPerRun = numberOfEventsPerRun;
    }

    private class EventProducerThread
    extends Thread {
        int count = 0;

        private EventProducerThread() {
        }

        @Override
        public void run() {
            while (this.count < 2 && !PersonEventProducer.this.shutdownRequested.get()) {
                PersonEventProducer.this.eventBuffer.startEvents();
                long endValue = PersonEventProducer.this.sinceSCN.longValue() + (long)PersonEventProducer.this.numberOfEventsPerRun;
                for (long i = PersonEventProducer.this.sinceSCN.longValue(); i < endValue; ++i) {
                    Person person = new Person(i, "Aesop " + i, "Mr. " + i, i, "false", new LinkedList<FieldChange>());
                    byte[] serializedEvent = PersonEventProducer.this.serializeEvent((GenericRecord)person);
                    byte[] schemaId = SchemaHelper.getSchemaId((String)person.getSchema().toString());
                    DbusEventKey eventKey = new DbusEventKey(i);
                    DbusEventInfo eventInfo = new DbusEventInfo(DbusOpcode.UPSERT, i, (short)PersonEventProducer.this.physicalSourceStaticConfig.getId(), (short)PersonEventProducer.this.physicalSourceStaticConfig.getId(), System.nanoTime(), PersonEventProducer.this.physicalSourceStaticConfig.getSources()[0].getId(), schemaId, serializedEvent, false, true);
                    PersonEventProducer.this.eventBuffer.appendEvent(eventKey, eventInfo, PersonEventProducer.this.dbusEventsStatisticsCollector);
                    PersonEventProducer.this.sinceSCN.getAndIncrement();
                    LOGGER.info("Added an event : Aesop Mr. " + i);
                }
                PersonEventProducer.this.eventBuffer.endEvents(PersonEventProducer.this.sinceSCN.longValue(), PersonEventProducer.this.dbusEventsStatisticsCollector);
                try {
                    PersonEventProducer.this.maxScnReaderWriter.saveMaxScn(PersonEventProducer.this.sinceSCN.longValue() + (long)PersonEventProducer.this.numberOfEventsPerRun);
                }
                catch (DatabusException e) {
                    LOGGER.error("Error persisting Max SCN : " + e.getMessage(), (Throwable)e);
                }
                ++this.count;
                try {
                    Thread.sleep(10000L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

