/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.producer.hbase;

import com.flipkart.aesop.runtime.producer.AbstractEventProducer;
import com.flipkart.aesop.runtime.producer.hbase.SepEventMapper;
import com.linkedin.databus.core.DbusEventInfo;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import com.ngdata.sep.EventListener;
import com.ngdata.sep.SepEvent;
import com.ngdata.sep.impl.SepConsumer;
import com.ngdata.sep.impl.SepModelImpl;
import com.ngdata.sep.util.zookeeper.ZkUtil;
import com.ngdata.sep.util.zookeeper.ZooKeeperItf;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.trpr.platform.core.PlatformException;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class HBaseEventProducer<T extends GenericRecord>
extends AbstractEventProducer
implements InitializingBean {
    private static final Logger LOGGER = LogFactory.getLogger(HBaseEventProducer.class);
    private static final String HBASE_REPLICATION_CONFIG = "hbase.replication";
    private static final int WORKER_THREADS = 1;
    protected SepConsumer sepConsumer;
    private String localHost;
    protected String zkQuorum;
    protected Integer zkPort;
    protected int workerThreads = 1;
    protected SepEventMapper<T> sepEventMapper;
    private volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false);

    public void afterPropertiesSet() throws Exception {
        Assert.notNull((Object)this.zkQuorum, (String)"'zkQuorum' cannot be null. Zookeeper quorum list must be specified. This HBase Events producer will not be initialized");
        Assert.notNull((Object)this.zkPort, (String)"'zkPort' cannot be null. Zookeeper port must be specified. This HBase Events producer will not be initialized");
        Assert.notNull(this.sepEventMapper, (String)"'sepEventMapper' cannot be null. No WAL edits event mapper found. This HBase Events producer will not be initialized");
        this.localHost = InetAddress.getLocalHost().getHostName();
    }

    public void start(long sinceSCN) {
        this.shutdownRequested.set(false);
        this.sinceSCN.set(sinceSCN);
        LOGGER.info("Starting SEP subscription : " + this.getName());
        LOGGER.info("ZK connection details [host:port] = {} : {}", (Object)this.zkQuorum, (Object)this.zkPort);
        LOGGER.info("Using hostname to bind to : " + this.localHost);
        LOGGER.info("Using worker threads : " + this.workerThreads);
        LOGGER.info("Listening to WAL edits from : " + this.sinceSCN);
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.setBoolean(HBASE_REPLICATION_CONFIG, true);
            ZooKeeperItf zk = ZkUtil.connect((String)this.zkQuorum, (int)this.zkPort);
            SepModelImpl sepModel = new SepModelImpl(zk, conf);
            String subscriptionName = this.getName();
            if (!sepModel.hasSubscription(subscriptionName)) {
                sepModel.addSubscriptionSilent(subscriptionName);
            }
            this.sepConsumer = new SepConsumer(subscriptionName, this.sinceSCN.get(), (EventListener)new RelayAppender(), this.workerThreads, this.localHost, zk, conf);
            this.sepConsumer.start();
        }
        catch (Exception e) {
            LOGGER.error("Error starting WAL edits consumer. Producer not started!. Error message : " + e.getMessage(), (Throwable)e);
        }
    }

    public void shutdown() {
        LOGGER.info("Shutdown has been requested. HBaseEventProducer shutting down");
        this.shutdownRequested.set(true);
        this.sepConsumer.stop();
        super.shutdown();
        LOGGER.info("HBaseEventProducer shut down complete");
    }

    public boolean isPaused() {
        return !this.isRunning();
    }

    public boolean isRunning() {
        return this.sepConsumer.isRunning();
    }

    public void pause() {
        throw new UnsupportedOperationException("'pause' is not supported on this event producer");
    }

    public void unpause() {
        throw new UnsupportedOperationException("'unpause' is not supported on this event producer");
    }

    public void waitForShutdown() throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown' is not supported on this event producer");
    }

    public void waitForShutdown(long time) throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown(long time)' is not supported on this event producer");
    }

    public String getZkQuorum() {
        return this.zkQuorum;
    }

    public void setZkQuorum(String zkQuorum) {
        this.zkQuorum = zkQuorum;
    }

    public Integer getZkPort() {
        return this.zkPort;
    }

    public void setZkPort(Integer zkPort) {
        this.zkPort = zkPort;
    }

    public int getWorkerThreads() {
        return this.workerThreads;
    }

    public void setWorkerThreads(int workerThreads) {
        this.workerThreads = workerThreads;
    }

    public SepEventMapper<T> getSepEventMapper() {
        return this.sepEventMapper;
    }

    public void setSepEventMapper(SepEventMapper<T> sepEventMapper) {
        this.sepEventMapper = sepEventMapper;
    }

    class RelayAppender
    implements EventListener {
        RelayAppender() {
        }

        public void processEvents(List<SepEvent> sepEvents) {
            if (HBaseEventProducer.this.shutdownRequested.get()) {
                return;
            }
            long lastSavedSCN = HBaseEventProducer.this.sinceSCN.get();
            HBaseEventProducer.this.eventBuffer.startEvents();
            for (SepEvent sepEvent : sepEvents) {
                Object changeEvent = HBaseEventProducer.this.sepEventMapper.mapSepEvent(sepEvent);
                byte[] schemaId = SchemaHelper.getSchemaId((String)changeEvent.getSchema().toString());
                byte[] serializedEvent = HBaseEventProducer.this.serializeEvent(changeEvent);
                long latestTimestamp = 0L;
                for (KeyValue kv : sepEvent.getKeyValues()) {
                    latestTimestamp = Math.max(latestTimestamp, kv.getTimestamp());
                }
                DbusEventKey eventKey = new DbusEventKey(sepEvent.getRow());
                DbusEventInfo eventInfo = new DbusEventInfo(DbusOpcode.UPSERT, latestTimestamp, (short)HBaseEventProducer.this.physicalSourceStaticConfig.getId(), (short)HBaseEventProducer.this.physicalSourceStaticConfig.getId(), System.nanoTime(), HBaseEventProducer.this.physicalSourceStaticConfig.getSources()[0].getId(), schemaId, serializedEvent, false, true);
                HBaseEventProducer.this.eventBuffer.appendEvent(eventKey, eventInfo, HBaseEventProducer.this.dbusEventsStatisticsCollector);
                HBaseEventProducer.this.sinceSCN.set(Math.max(lastSavedSCN, latestTimestamp));
            }
            HBaseEventProducer.this.eventBuffer.endEvents(HBaseEventProducer.this.sinceSCN.get(), HBaseEventProducer.this.dbusEventsStatisticsCollector);
            try {
                HBaseEventProducer.this.maxScnReaderWriter.saveMaxScn(HBaseEventProducer.this.sinceSCN.get());
            }
            catch (DatabusException e) {
                LOGGER.error("Unable to persist last processed SCN. SCN value is stale. Error is : " + e.getMessage(), (Throwable)e);
                throw new PlatformException("Unable to write last processed SCN to log. Signalling for re-delivery of WAL edits from : " + lastSavedSCN);
            }
            LOGGER.debug("Processed SEP event count : " + sepEvents.size());
        }
    }
}

