/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.bootstrap.producer;

import com.linkedin.databus.bootstrap.api.BootstrapProducerStatus;
import com.linkedin.databus.bootstrap.common.BootstrapConn;
import com.linkedin.databus.bootstrap.common.BootstrapDBCleaner;
import com.linkedin.databus.bootstrap.common.BootstrapDBMetaDataDAO;
import com.linkedin.databus.bootstrap.common.BootstrapProducerStatsCollector;
import com.linkedin.databus.bootstrap.common.BootstrapReadOnlyConfig;
import com.linkedin.databus.bootstrap.producer.BootstrapApplierThread;
import com.linkedin.databus.bootstrap.producer.BootstrapDBDiskSpaceTriggerThread;
import com.linkedin.databus.bootstrap.producer.BootstrapDBPeriodicTriggerThread;
import com.linkedin.databus.bootstrap.producer.BootstrapProducerCallback;
import com.linkedin.databus.bootstrap.producer.BootstrapProducerConfig;
import com.linkedin.databus.bootstrap.producer.BootstrapProducerStaticConfig;
import com.linkedin.databus.client.DatabusHttpClientImpl;
import com.linkedin.databus.client.pub.CheckpointPersistenceProvider;
import com.linkedin.databus.client.pub.DatabusClientException;
import com.linkedin.databus.client.pub.DatabusStreamConsumer;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DatabusThreadBase;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectorMergeable;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectors;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.ConfigLoader;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.netty.ServerContainer;
import com.linkedin.databus2.core.container.request.BootstrapDBException;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooOldException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.log4j.Logger;

public class DatabusBootstrapProducer
extends DatabusHttpClientImpl
implements BootstrapProducerCallback.ErrorCaseHandler {
    public static final String MODULE = DatabusBootstrapProducer.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final HashSet<SourceInfo> _registeredPhysicalSources = new HashSet();
    private final List<String> _registeredSources;
    private final Map<String, DatabusThreadBase> _applierThreads;
    private final BootstrapDBPeriodicTriggerThread _dbPeriodicTriggerThread;
    private final BootstrapDBDiskSpaceTriggerThread _dbDiskSpaceTriggerThread;
    private final BootstrapDBCleaner _dbCleaner;
    private final BootstrapDBMetaDataDAO _dbDao;
    private final Map<String, Integer> _srcNameIdMap;
    protected final StatsCollectors<BootstrapProducerStatsCollector> _bootstrapProducerStatsCollectors;
    private final BootstrapProducerStatsCollector _applierStatsCollector;
    private final BootstrapProducerStaticConfig _bootstrapProducerStaticConfig;

    public DatabusBootstrapProducer(BootstrapProducerConfig config) throws IOException, InvalidConfigException, InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException, DatabusClientException, DatabusException, BootstrapDBException {
        this(config.build());
    }

    public DatabusBootstrapProducer(BootstrapProducerStaticConfig bootstrapProducerStaticConfig) throws IOException, InvalidConfigException, InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException, DatabusClientException, DatabusException, BootstrapDBException {
        super(bootstrapProducerStaticConfig.getClient());
        this.decouplePhysicalSources();
        this._bootstrapProducerStaticConfig = bootstrapProducerStaticConfig;
        this._registeredSources = new ArrayList<String>();
        this._applierStatsCollector = new BootstrapProducerStatsCollector(this.getContainerStaticConfig().getId(), "bootstrapApplier", true, true, this.getMbeanServer(), null);
        this._bootstrapProducerStatsCollectors = new StatsCollectors();
        this._applierThreads = new HashMap<String, DatabusThreadBase>();
        BootstrapConn conn = new BootstrapConn();
        boolean autoCommit = false;
        this._dbDao = new BootstrapDBMetaDataDAO(conn, bootstrapProducerStaticConfig.getBootstrapDBHostname(), bootstrapProducerStaticConfig.getBootstrapDBUsername(), bootstrapProducerStaticConfig.getBootstrapDBPassword(), bootstrapProducerStaticConfig.getBootstrapDBName(), false);
        this._srcNameIdMap = new HashMap<String, Integer>();
        conn.initBootstrapConn(false, bootstrapProducerStaticConfig.getBootstrapDBUsername(), bootstrapProducerStaticConfig.getBootstrapDBPassword(), bootstrapProducerStaticConfig.getBootstrapDBHostname(), bootstrapProducerStaticConfig.getBootstrapDBName());
        if (!this._dbDao.doesMinScnTableExist()) {
            LOG.error((Object)"Bootstrap table not found! Please create table: bootstrap_tab_minscn (srcid int(11) NOT NULL,minscn bigint(20) NOT NULL default -1, PRIMARY KEY  (srcid)) ENGINE=InnoDB;");
            throw new BootstrapDBException("Bootstrap DB does not have necessary meta data table! bootstrap_tab_minscn");
        }
        this.initBootstrapDBMetadata();
        LOG.info((Object)("The Bootstrap Producer is configured for " + this._registeredPhysicalSources.size() + " sources"));
        for (SourceInfo sourceInfo : this._registeredPhysicalSources) {
            LOG.info((Object)("Creating BootstrapProducer callback for PhysicalSource: " + sourceInfo.getPhysicalSourceName()));
            BootstrapProducerStatsCollector producerStatsCollector = new BootstrapProducerStatsCollector(this.getContainerStaticConfig().getId(), sourceInfo.getPhysicalSourceName(), true, true, this.getMbeanServer(), sourceInfo.getLogicalSources());
            this._bootstrapProducerStatsCollectors.addStatsCollector(sourceInfo.getPhysicalSourceName(), (StatsCollectorMergeable)producerStatsCollector);
            this.registerProducerCallback(sourceInfo.getLogicalSources(), producerStatsCollector);
        }
        this.validateAndRepairBootstrapDBCheckpoint();
        for (String source : this._registeredSources) {
            LOG.info((Object)("Creating ApplierThread for source = " + source));
            String name = source + "BootstrapApplier";
            BootstrapApplierThread applierThread = new BootstrapApplierThread(name, source, this._bootstrapProducerStaticConfig, this._applierStatsCollector);
            this._applierThreads.put(source, applierThread);
        }
        String dbCleanerName = "DBCleaner";
        this._dbCleaner = new BootstrapDBCleaner("DBCleaner", this._bootstrapProducerStaticConfig.getCleaner(), (BootstrapReadOnlyConfig)this._bootstrapProducerStaticConfig, this._applierThreads, this._registeredSources);
        this._dbPeriodicTriggerThread = new BootstrapDBPeriodicTriggerThread(this._dbCleaner, this._bootstrapProducerStaticConfig.getCleaner().getPeriodSpaceTrigger());
        this._dbDiskSpaceTriggerThread = new BootstrapDBDiskSpaceTriggerThread(this._dbCleaner, this._bootstrapProducerStaticConfig.getCleaner().getDiskSpaceTrigger());
    }

    private void decouplePhysicalSources() {
        DatabusHttpClientImpl.RuntimeConfig clientRtConfig = (DatabusHttpClientImpl.RuntimeConfig)this.getClientConfigManager().getReadOnlyConfig();
        for (ServerInfo relayInfo : clientRtConfig.getRelays()) {
            if (relayInfo == null || relayInfo.getSources() == null) {
                LOG.error((Object)"No sources specified in the client config for the bootstrap producer");
            }
            if (relayInfo.getPhysicalSourceName() == null) {
                LOG.error((Object)"PhysicalSource name not specified");
            }
            SourceInfo sourceInfo = new SourceInfo(relayInfo.getPhysicalSourceName(), relayInfo.getSources());
            this._registeredPhysicalSources.add(sourceInfo);
        }
    }

    private void validateAndRepairBootstrapDBCheckpoint() throws SQLException, BootstrapDBException, IOException {
        LOG.info((Object)"Validating bootstrap DB checkpoints !!");
        for (List subsList : this._relayGroups.keySet()) {
            String msg;
            List sourceNames = DatabusSubscription.getStrList((List)subsList);
            long scn = -1L;
            try {
                scn = this._dbDao.getMinWindowSCNFromStateTable(sourceNames, "bootstrap_producer_state");
            }
            catch (BootstrapDBException ex) {
                LOG.error((Object)("Got exception while trying to fetch SCN from bootstrap_producer_state for sources :" + sourceNames), (Throwable)ex);
                throw ex;
            }
            CheckpointPersistenceProvider provider = this.getCheckpointPersistenceProvider();
            Checkpoint cp = provider.loadCheckpoint(sourceNames);
            LOG.info((Object)("Bootstrap Producer SCN :" + scn + ", Checkpoint :" + cp));
            if (null != cp) {
                if (cp.getConsumptionMode() != DbusClientMode.ONLINE_CONSUMPTION) {
                    msg = "Bootstrap Producer starting from non-online consumption mode for sources :" + sourceNames + ", Ckpt :" + cp;
                    LOG.error((Object)msg);
                    throw new BootstrapDBException(msg);
                }
                msg = null;
                if ((cp.getWindowScn() <= scn || scn <= -1L) && (cp.getWindowScn() >= scn || scn <= -1L)) continue;
                if (cp.getWindowScn() > scn && scn > -1L) {
                    LOG.warn((Object)("Non-Empty checkpint. Bootstrap Producer is at SCN:" + scn + ", while checkpoint is :" + cp + ", Could result in gap in event consumption. Repairing ckpt !!"));
                } else {
                    LOG.info((Object)("Non-Empty checkpoint. Bootstrap Producer is at SCN:" + scn + ", while checkpoint is :" + cp + ", Copying producer Scn to checkpoint !!"));
                }
                cp.setWindowScn(Long.valueOf(scn));
                cp.setWindowOffset(-1L);
                try {
                    provider.removeCheckpoint(sourceNames);
                    provider.storeCheckpoint(sourceNames, cp);
                    cp = provider.loadCheckpoint(sourceNames);
                    if (null != cp && cp.getWindowScn() == scn && cp.getWindowOffset() == -1L && cp.getConsumptionMode() == DbusClientMode.ONLINE_CONSUMPTION) continue;
                    msg = "Unable to repair and store the new checkpoint (" + cp + ") to make it same as producer SCN (" + scn + ") !!";
                    LOG.fatal((Object)msg);
                    throw new BootstrapDBException(msg);
                }
                catch (IOException ex) {
                    msg = "Unable to repair and store the new checkpoint (" + cp + ") to make it same as producer SCN (" + scn + ") !!";
                    LOG.fatal((Object)msg, (Throwable)ex);
                    throw new BootstrapDBException(msg);
                }
            }
            if (scn <= -1L) continue;
            msg = "Empty checkpoint. Bootstrap Producer SCN is at SCN:" + scn + ", while checkpoint is null !! Could result in gap in event consumption. Repairing ckpt !!";
            LOG.warn((Object)msg);
            cp = new Checkpoint();
            cp.setWindowScn(Long.valueOf(scn));
            cp.setWindowOffset(-1L);
            cp.setConsumptionMode(DbusClientMode.ONLINE_CONSUMPTION);
            try {
                provider.removeCheckpoint(sourceNames);
                provider.storeCheckpoint(sourceNames, cp);
                cp = provider.loadCheckpoint(sourceNames);
                if (null != cp && cp.getWindowScn() == scn && cp.getWindowOffset() == -1L && cp.getConsumptionMode() == DbusClientMode.ONLINE_CONSUMPTION) continue;
                LOG.fatal((Object)("Unable to repair and store the checkpoint (" + cp + ") to make it same as producer SCN (" + scn + ") !!"));
                throw new BootstrapDBException(msg);
            }
            catch (IOException ex) {
                msg = "Unable to repair and store the checkpoint (" + cp + ") to make it same as producer SCN (" + scn + ") !!";
                LOG.fatal((Object)msg, (Throwable)ex);
                throw new BootstrapDBException(msg);
            }
        }
        LOG.info((Object)"Validating bootstrap DB checkpoints done successfully!!");
    }

    private void registerProducerCallback(List<String> logicalSourceList, BootstrapProducerStatsCollector statsCollector) throws SQLException, DatabusClientException, DatabusException {
        BootstrapProducerCallback bootstrapCallback = new BootstrapProducerCallback(this._bootstrapProducerStaticConfig, statsCollector, this, logicalSourceList);
        this.registerDatabusStreamListener((DatabusStreamConsumer)bootstrapCallback, logicalSourceList, null);
    }

    private void initBootstrapDBMetadata() throws SQLException, BootstrapDatabaseTooOldException {
        DatabusHttpClientImpl.RuntimeConfig clientRtConfig = (DatabusHttpClientImpl.RuntimeConfig)this.getClientConfigManager().getReadOnlyConfig();
        for (ServerInfo relayInfo : clientRtConfig.getRelays()) {
            this._registeredSources.addAll(relayInfo.getSources());
            for (String source : this._registeredSources) {
                BootstrapDBMetaDataDAO.SourceStatusInfo srcIdStatus = this._dbDao.getSrcIdStatusFromDB(source, false);
                if (0 > srcIdStatus.getSrcId()) {
                    int newState = 1;
                    if (!this._bootstrapProducerStaticConfig.isBootstrapDBStateCheck()) {
                        newState = 4;
                    }
                    this._dbDao.addNewSourceInDB(source, newState);
                }
                srcIdStatus = this._dbDao.getSrcIdStatusFromDB(source, false);
                this._srcNameIdMap.put(source, srcIdStatus.getSrcId());
                if (!this._bootstrapProducerStaticConfig.isBootstrapDBStateCheck() || BootstrapProducerStatus.isReadyForConsumption((int)srcIdStatus.getStatus())) continue;
                throw new BootstrapDatabaseTooOldException("Bootstrap DB is not ready to read from relay !! Status :" + srcIdStatus);
            }
        }
    }

    public List<String> getRegisteredSources() {
        return this._registeredSources;
    }

    public void doStart() {
        super.doStart();
        if (this._bootstrapProducerStaticConfig.getRunApplierThreadOnStart()) {
            for (Map.Entry<String, DatabusThreadBase> applierThreadEntry : this._applierThreads.entrySet()) {
                LOG.info((Object)("Starting applier thread for source = " + applierThreadEntry.getValue()));
                applierThreadEntry.getValue().start();
            }
        } else {
            LOG.info((Object)"Not starting any applier threads because the config getRunApplierThreadOnStart is false");
        }
        if (this._bootstrapProducerStaticConfig.getCleaner().getDiskSpaceTrigger().isEnable()) {
            LOG.info((Object)"Starting disk space trigger thread");
            this._dbDiskSpaceTriggerThread.start();
        }
        if (this._bootstrapProducerStaticConfig.getCleaner().getPeriodSpaceTrigger().isEnable()) {
            LOG.info((Object)"Starting periodic trigger thread");
            this._dbPeriodicTriggerThread.start();
        }
        LOG.info((Object)(DatabusBootstrapProducer.class.getName() + " is running ..."));
    }

    public static void main(String[] args) throws Exception {
        Properties startupProps = ServerContainer.processCommandLineArgs((String[])args);
        BootstrapProducerConfig producerConfig = new BootstrapProducerConfig();
        ConfigLoader staticProducerConfigLoader = new ConfigLoader("databus.bootstrap.", (ConfigBuilder)producerConfig);
        BootstrapProducerStaticConfig staticProducerConfig = (BootstrapProducerStaticConfig)((Object)staticProducerConfigLoader.loadConfig((Map)startupProps));
        DatabusBootstrapProducer bootstrapProducer = new DatabusBootstrapProducer(staticProducerConfig);
        bootstrapProducer.registerShutdownHook();
        bootstrapProducer.startAndBlock();
    }

    protected void doShutdown() {
        super.doShutdown();
        for (Map.Entry<String, DatabusThreadBase> applierThreadEntry : this._applierThreads.entrySet()) {
            DatabusThreadBase applierThread = applierThreadEntry.getValue();
            if (!applierThread.isAlive()) continue;
            applierThread.shutdownAsynchronously();
            applierThread.interrupt();
            applierThread.awaitShutdownUniteruptibly();
        }
        if (this._dbDiskSpaceTriggerThread.isAlive()) {
            this._dbDiskSpaceTriggerThread.shutdownAsynchronously();
            this._dbDiskSpaceTriggerThread.interrupt();
            this._dbDiskSpaceTriggerThread.awaitShutdownUniteruptibly();
        }
        if (this._dbPeriodicTriggerThread.isAlive()) {
            this._dbPeriodicTriggerThread.shutdownAsynchronously();
            this._dbPeriodicTriggerThread.interrupt();
            this._dbPeriodicTriggerThread.awaitShutdownUniteruptibly();
        }
    }

    public StatsCollectors<BootstrapProducerStatsCollector> getProducerStatsCollectors() {
        return this._bootstrapProducerStatsCollectors;
    }

    public BootstrapProducerStatsCollector getApplierStatsCollector() {
        return this._applierStatsCollector;
    }

    @Override
    public void onErrorRetryLimitExceeded(String message, Throwable exception) {
        LOG.fatal((Object)("Error Retry Limit reached. Message :(" + message + "). Stopping Bootstrap Producer Service. Exception Received :"), exception);
        this.doShutdown();
    }

    private static class SourceInfo {
        private String PhysicalSourceName;
        private List<String> logicalSources;

        public List<String> getLogicalSources() {
            return this.logicalSources;
        }

        public void setLogicalSources(List<String> logicalSources) {
            this.logicalSources = logicalSources;
        }

        public String getPhysicalSourceName() {
            return this.PhysicalSourceName;
        }

        public void setPhysicalSourceName(String physicalSourceName) {
            this.PhysicalSourceName = physicalSourceName;
        }

        private SourceInfo(String physicalSourceName, List<String> logicalSources) {
            this.PhysicalSourceName = physicalSourceName;
            this.logicalSources = logicalSources;
        }

        public boolean equals(Object obj) {
            if (null == obj) {
                return false;
            }
            if (!(obj instanceof SourceInfo)) {
                return false;
            }
            SourceInfo other = (SourceInfo)obj;
            return this.PhysicalSourceName.equals(other.getPhysicalSourceName());
        }

        public int hashCode() {
            return this.PhysicalSourceName != null ? this.PhysicalSourceName.hashCode() : 0;
        }
    }
}

