/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.bootstrap.producer;

import com.linkedin.databus.bootstrap.common.BootstrapConn;
import com.linkedin.databus.bootstrap.common.BootstrapDBMetaDataDAO;
import com.linkedin.databus.bootstrap.common.BootstrapProducerStatsCollector;
import com.linkedin.databus.bootstrap.common.BootstrapReadOnlyConfig;
import com.linkedin.databus.bootstrap.common.SourceInfo;
import com.linkedin.databus.client.consumer.AbstractDatabusStreamConsumer;
import com.linkedin.databus.client.pub.ConsumerCallbackResult;
import com.linkedin.databus.client.pub.DbusEventDecoder;
import com.linkedin.databus.client.pub.SCN;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventInternalWritable;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus.core.ScnNotFoundException;
import com.linkedin.databus.core.util.RateMonitor;
import com.linkedin.databus.core.util.StringUtils;
import com.linkedin.databus2.core.BackoffTimer;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.util.DBHelper;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.log4j.Logger;

public class BootstrapProducerCallback
extends AbstractDatabusStreamConsumer {
    public static final String MODULE = BootstrapProducerCallback.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private BootstrapDBMetaDataDAO _bootstrapDao = null;
    private PreparedStatement _stmt = null;
    private PreparedStatement _logScnStmt = null;
    private int _numEvents = 0;
    private int _totalNumEvents = 0;
    private long _seedCatchupScn = -1L;
    private int _state = 4;
    private long _oldWindowScn = -1L;
    private long _newWindowScn = -1L;
    private long _producerStartScn = -1L;
    private Map<String, SourceInfo> _trackedSources = null;
    private Map<Integer, String> _trackedSrcIdsToNames = null;
    private BootstrapReadOnlyConfig _config = null;
    private String _currentSource = null;
    private BackoffTimer _retryTimer = null;
    private List<String> _logicalSources = null;
    private BootstrapProducerStatsCollector _statsCollector = null;
    private final RateMonitor _srcRm = new RateMonitor("ProducerSourceRateMonitor");
    private final RateMonitor _totalRm = new RateMonitor("ProducerTotalRateMonitor");
    private int _currentLogId;
    private int _currentRowId;
    private final int _maxRowsInLog;
    private boolean _errorRetriesExceeded;
    private ErrorCaseHandler _errorHandler = null;

    public BootstrapProducerCallback(BootstrapReadOnlyConfig config, List<String> logicalSources) throws Exception {
        this(config, null, null, logicalSources);
    }

    public BootstrapProducerCallback(BootstrapReadOnlyConfig config, BootstrapProducerStatsCollector statsCollector, ErrorCaseHandler errorHandler, List<String> logicalSources) throws SQLException, DatabusException {
        this._config = config;
        this._logicalSources = logicalSources;
        this._statsCollector = statsCollector;
        this._maxRowsInLog = this._config.getBootstrapLogSize();
        this._retryTimer = new BackoffTimer("BootstrapProducer", config.getRetryConfig());
        this._errorRetriesExceeded = false;
        this._errorHandler = errorHandler;
        this.getConnection();
        this.init();
    }

    public void init() throws SQLException, DatabusException {
        HashSet<String> configedSources = new HashSet<String>(this._logicalSources);
        this._trackedSources = this._bootstrapDao.getDBTrackedSources(configedSources);
        this._trackedSrcIdsToNames = new HashMap<Integer, String>();
        for (Map.Entry<String, SourceInfo> entry : this._trackedSources.entrySet()) {
            this._trackedSrcIdsToNames.put(entry.getValue().getSrcId(), entry.getKey());
        }
        LOG.info((Object)("maxRowsInLog=" + this._maxRowsInLog));
        LOG.info((Object)"trackedSources: ");
        int lastState = 0;
        int curr = 0;
        for (SourceInfo info : this._trackedSources.values()) {
            if (0 == curr) {
                lastState = info.getStatus();
            } else if (info.getStatus() != lastState) {
                String msg = "Bootstrap Source state does not seem to be consistent for all the sources that this producer listens to.  Found atleast 2 different states :" + lastState + "," + info.getStatus();
                LOG.error((Object)msg);
                throw new RuntimeException(msg);
            }
            ++curr;
            LOG.info((Object)info.toString());
        }
        this._state = lastState;
        this.initWindowScn();
    }

    public ConsumerCallbackResult onStartDataEventSequence(SCN startScn) {
        this._srcRm.start();
        this._totalNumEvents = 0;
        ConsumerCallbackResult success = ConsumerCallbackResult.SUCCESS;
        try {
            if (this._oldWindowScn == -1L) {
                this.initWindowScn();
            }
        }
        catch (SQLException e) {
            if (null != this._statsCollector) {
                this._statsCollector.registerSQLException();
            }
            LOG.error((Object)"Got SQLException in startDataEventSequence Hanlder!! Connections will be reset !!", (Throwable)e);
            try {
                this.reset();
            }
            catch (DatabusException e2) {
                DbusPrettyLogUtils.logExceptionAtError((String)"Unable to reset connection", (Throwable)e2, (Logger)LOG);
            }
            catch (SQLException sqlEx) {
                DbusPrettyLogUtils.logExceptionAtError((String)"Got exception while resetting connections. Stopping Client !!", (Throwable)sqlEx, (Logger)LOG);
                return ConsumerCallbackResult.ERROR_FATAL;
            }
            success = ConsumerCallbackResult.ERROR;
        }
        return success;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConsumerCallbackResult onEndDataEventSequence(SCN endScn) {
        try {
            this.updateAllProducerSourcesMetaData();
            this._oldWindowScn = this._newWindowScn;
            this.updateSourcesInDB();
            boolean markActive = false;
            if (this._state == 3) {
                if (this._newWindowScn > this._seedCatchupScn) {
                    LOG.info((Object)("Bootstrap DB for sources (" + this._trackedSources.values() + ") has completed the seeding catchup phase. Marking them active in bootstrap_sources table !! SeedCatchupSCN was :" + this._seedCatchupScn));
                    markActive = true;
                }
            } else if (this._state == 6 && this._newWindowScn > this._producerStartScn) {
                LOG.info((Object)("Bootstrap DB for sources (" + this._trackedSources.values() + ") has started getting events since last fell-off relay !! Marking them active !!"));
                markActive = true;
            }
            if (markActive) {
                this._bootstrapDao.updateSourcesStatus(this._trackedSources.keySet(), 4);
            }
            Connection conn = this.getConnection();
            try {
                DBHelper.commit((Connection)conn);
            }
            catch (SQLException s) {
                DBHelper.rollback((Connection)conn);
                throw s;
            }
            if (markActive) {
                this._state = 4;
                for (SourceInfo info : this._trackedSources.values()) {
                    info.setStatus(4);
                }
            }
            LOG.info((Object)("bootstrap producer upto scn " + this._newWindowScn));
        }
        catch (SQLException e) {
            if (null != this._statsCollector) {
                this._statsCollector.registerSQLException();
            }
            LOG.error((Object)"Got SQLException in endDataEventSequence Handler !! Connections will be reset !!", (Throwable)e);
            try {
                this.reset();
            }
            catch (DatabusException e2) {
                DbusPrettyLogUtils.logExceptionAtError((String)"Unable to reset connection", (Throwable)e2, (Logger)LOG);
            }
            catch (SQLException sqlEx) {
                LOG.error((Object)"Got exception while resetting connections. Stopping Client !!", (Throwable)sqlEx);
                ConsumerCallbackResult i$ = ConsumerCallbackResult.ERROR_FATAL;
                return i$;
            }
            ConsumerCallbackResult consumerCallbackResult = ConsumerCallbackResult.ERROR;
            return consumerCallbackResult;
        }
        finally {
            this._totalRm.stop();
            long latency = this._totalRm.getDuration() / 1000000L;
            if (null != this._statsCollector) {
                this._statsCollector.registerEndWindow(latency, (long)this._totalNumEvents, this._newWindowScn);
            }
        }
        return ConsumerCallbackResult.SUCCESS;
    }

    public ConsumerCallbackResult onRollback(SCN startScn) {
        return this._errorRetriesExceeded ? ConsumerCallbackResult.ERROR_FATAL : ConsumerCallbackResult.SUCCESS;
    }

    public ConsumerCallbackResult onStartSource(String source, Schema sourceSchema) {
        this._numEvents = 0;
        boolean ret = false;
        SourceInfo srcInfo = null;
        this._currentSource = source;
        this._srcRm.start();
        try {
            srcInfo = this._trackedSources.get(source);
            if (null == srcInfo) {
                LOG.error((Object)("Source :" + source + " not managed in this bootstrap DB instance !! Managed Sources : (" + this._trackedSources + ")"));
                return ConsumerCallbackResult.ERROR;
            }
            ret = this.prepareStatement(srcInfo.getSrcId());
        }
        catch (SQLException e) {
            if (null != this._statsCollector) {
                this._statsCollector.registerSQLException();
            }
            LOG.error((Object)"Got SQLException in startSource Hanlder!! Connections will be reset !!", (Throwable)e);
            try {
                this.reset();
            }
            catch (DatabusException e2) {
                DbusPrettyLogUtils.logExceptionAtError((String)"Unable to reset connection", (Throwable)e2, (Logger)LOG);
            }
            catch (SQLException sqlEx) {
                LOG.error((Object)"Got exception while resetting connections. Stopping Client !!", (Throwable)sqlEx);
                return ConsumerCallbackResult.ERROR_FATAL;
            }
            return ConsumerCallbackResult.ERROR;
        }
        return ret ? ConsumerCallbackResult.SUCCESS : ConsumerCallbackResult.ERROR;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConsumerCallbackResult onEndSource(String source, Schema sourceSchema) {
        try {
            this.updateProducerSourceMetaData(source);
            if (this._stmt != null) {
                this._stmt.close();
                this._stmt = null;
            }
        }
        catch (SQLException e) {
            if (null != this._statsCollector) {
                this._statsCollector.registerSQLException();
            }
            LOG.error((Object)"Got SQLException in endSource Hanlder!! Connections will be reset !!", (Throwable)e);
            try {
                this.reset();
            }
            catch (DatabusException e2) {
                DbusPrettyLogUtils.logExceptionAtError((String)"Unable to reset connection", (Throwable)e2, (Logger)LOG);
            }
            catch (SQLException sqlEx) {
                LOG.error((Object)"Got exception while resetting connections. Stopping Client !!", (Throwable)sqlEx);
                ConsumerCallbackResult consumerCallbackResult = ConsumerCallbackResult.ERROR_FATAL;
                return consumerCallbackResult;
            }
            ConsumerCallbackResult consumerCallbackResult = ConsumerCallbackResult.ERROR;
            return consumerCallbackResult;
        }
        finally {
            this._srcRm.stop();
            long latency = this._srcRm.getDuration() / 1000000L;
            if (null != this._statsCollector) {
                this._statsCollector.registerBatch(this._currentSource, latency, (long)this._numEvents, this._newWindowScn, (long)this._currentLogId, (long)this._currentRowId);
            }
            this._totalNumEvents += this._numEvents;
            this._numEvents = 0;
        }
        return ConsumerCallbackResult.SUCCESS;
    }

    public ConsumerCallbackResult onDataEvent(DbusEvent e, DbusEventDecoder eventDecoder) {
        if (e.sequence() < this._newWindowScn) {
            LOG.warn((Object)("Seeing an Old event. Dropping it !! Current SCN : " + this._newWindowScn + ". Event :" + e.toString()));
            return ConsumerCallbackResult.SUCCESS;
        }
        ++this._numEvents;
        this._newWindowScn = e.sequence();
        try {
            this._stmt.setLong(1, this._newWindowScn);
            this._stmt.setLong(2, this._newWindowScn);
            String keyStr = null;
            if (e.isKeyNumber()) {
                keyStr = Long.toString(e.key());
            } else if (e.isKeyString()) {
                keyStr = StringUtils.bytesToString((byte[])e.keyBytes());
            } else {
                if (e.isKeySchema()) {
                    LOG.error((Object)("schema key type not supported: " + e));
                    return ConsumerCallbackResult.ERROR;
                }
                LOG.error((Object)("unknown event key type: " + e));
                return ConsumerCallbackResult.ERROR;
            }
            this._stmt.setString(3, keyStr);
            if (!(e instanceof DbusEventInternalWritable)) {
                throw new UnsupportedClassVersionError("Cannot get raw bytes out of DbusEvent");
            }
            ByteBuffer bytebuff = ((DbusEventInternalWritable)e).getRawBytes();
            byte[] val = new byte[bytebuff.remaining()];
            bytebuff.get(val);
            this._stmt.setBytes(4, val);
            this._stmt.executeUpdate();
        }
        catch (SQLException e1) {
            if (null != this._statsCollector) {
                this._statsCollector.registerSQLException();
            }
            LOG.error((Object)"Got SQLException in dataEvent Hanlder!! Connections will be reset !!", (Throwable)e1);
            try {
                this.reset();
            }
            catch (DatabusException e2) {
                DbusPrettyLogUtils.logExceptionAtError((String)"Unable to reset connection", (Throwable)e2, (Logger)LOG);
            }
            catch (SQLException sqlEx) {
                LOG.error((Object)"Got exception while resetting connections. Stopping Client !!", (Throwable)sqlEx);
                return ConsumerCallbackResult.ERROR_FATAL;
            }
            return ConsumerCallbackResult.ERROR;
        }
        return ConsumerCallbackResult.SUCCESS;
    }

    public ConsumerCallbackResult onCheckpoint(SCN checkpointScn) {
        return ConsumerCallbackResult.SUCCESS;
    }

    public ConsumerCallbackResult onError(Throwable err) {
        ConsumerCallbackResult success = ConsumerCallbackResult.ERROR;
        try {
            if (err instanceof ScnNotFoundException) {
                try {
                    this.getConnection().rollback();
                }
                catch (SQLException sqlEx) {
                    if (null != this._statsCollector) {
                        this._statsCollector.registerSQLException();
                    }
                    LOG.error((Object)"Got exception while rolling back transaction !!", (Throwable)sqlEx);
                }
                this._bootstrapDao.updateSourcesStatus(this._trackedSources.keySet(), 6);
                if (null != this._statsCollector) {
                    this._statsCollector.registerFellOffRelay();
                }
            }
            success = ConsumerCallbackResult.SUCCESS;
        }
        catch (Exception e) {
            LOG.error((Object)"Got exception onError() ", (Throwable)e);
            success = ConsumerCallbackResult.ERROR;
        }
        return success;
    }

    private void reset() throws SQLException, DatabusException {
        boolean success = false;
        this._retryTimer.reset();
        while (!success) {
            try {
                DBHelper.close((Statement)this._stmt);
                this._stmt = null;
                DBHelper.close((Statement)this._logScnStmt);
                this._logScnStmt = null;
                this._bootstrapDao.getBootstrapConn().close();
                this._bootstrapDao.getBootstrapConn().getDBConn();
                this._bootstrapDao.getBootstrapConn().executeDummyBootstrapDBQuery();
                this.init();
                success = true;
            }
            catch (SQLException sqlEx) {
                if (null != this._statsCollector) {
                    this._statsCollector.registerSQLException();
                }
                LOG.error((Object)"Unable to reset the Bootstrap DB connection !!", (Throwable)sqlEx);
                if (this._retryTimer.getRemainingRetriesNum() <= 0) {
                    String message = "Producer Thread reached max retries trying to reset the MySQL Connections. Stopping !!";
                    LOG.fatal((Object)message);
                    this._errorRetriesExceeded = true;
                    this._errorHandler.onErrorRetryLimitExceeded(message, sqlEx);
                }
                this._retryTimer.backoffAndSleep();
            }
        }
    }

    private void updateAllProducerSourcesMetaData() throws SQLException {
        for (Map.Entry<String, SourceInfo> entry : this._trackedSources.entrySet()) {
            String src = entry.getKey();
            this.updateProducerSourceMetaData(src);
        }
    }

    private void updateProducerSourceMetaData(String source) throws SQLException {
        SourceInfo srcinfo = this._trackedSources.get(source);
        this._currentRowId = this.getLastLogEntry(source);
        this._currentLogId = srcinfo.getCurrLogId();
        this.setLogPosition(this._currentLogId, this._currentRowId, this._newWindowScn, source);
        srcinfo.setMaxRowId(this._currentRowId);
        srcinfo.setWindowScn(this._newWindowScn);
    }

    private boolean prepareStatement(int srcId) throws SQLException {
        Connection conn = null;
        try {
            conn = this.getConnection();
            StringBuilder sql = new StringBuilder();
            sql.append("insert into ");
            sql.append(this.getTableName(srcId));
            sql.append("(scn, windowscn, srckey, val) ");
            sql.append(" values(?,?,?,?)");
            this._stmt = conn.prepareStatement(sql.toString());
        }
        catch (SQLException e) {
            LOG.error((Object)"Got SQLException in prepareStatement!! ", (Throwable)e);
            throw e;
        }
        return true;
    }

    private void initWindowScn() throws SQLException {
        ResultSet rs = null;
        StringBuilder sql = new StringBuilder();
        Statement stmt = null;
        try {
            sql.append("select max(p.windowscn), max(s.endscn) from bootstrap_producer_state p, bootstrap_seeder_state s ");
            sql.append("where p.srcid = s.srcid and p.srcid in (");
            int count = this._trackedSources.size();
            for (SourceInfo srcInfo : this._trackedSources.values()) {
                sql.append(srcInfo.getSrcId());
                if (--count <= 0) continue;
                sql.append(",");
            }
            sql.append(")");
            stmt = this.getConnection().createStatement();
            LOG.info((Object)("sql query = " + sql.toString()));
            rs = stmt.executeQuery(sql.toString());
            if (rs.next()) {
                this._oldWindowScn = this._newWindowScn = rs.getLong(1);
                this._producerStartScn = this._newWindowScn;
                this._seedCatchupScn = rs.getLong(2);
            }
        }
        catch (SQLException e) {
            try {
                if (null != this._statsCollector) {
                    this._statsCollector.registerSQLException();
                }
                LOG.error((Object)"Unable to select producer's max windowscn. Setting windowscn to -1", (Throwable)e);
                this._oldWindowScn = -1L;
                this._newWindowScn = -1L;
                this._producerStartScn = -1L;
                throw e;
            }
            catch (Throwable throwable) {
                DBHelper.close(rs, stmt, null);
                throw throwable;
            }
        }
        DBHelper.close((ResultSet)rs, (Statement)stmt, null);
    }

    private int getLastLogEntry(String source) throws SQLException {
        int rid = 0;
        Statement stmt = null;
        ResultSet rs = null;
        int srcId = this._trackedSources.get(source).getSrcId();
        try {
            stmt = this.getConnection().createStatement();
            rs = stmt.executeQuery("select max(id) from " + this.getTableName(srcId));
            if (rs.next()) {
                rid = rs.getInt(1);
            }
        }
        catch (SQLException e) {
            LOG.error((Object)"Unable to find max. rid. Setting current rid to -1", (Throwable)e);
            rid = -1;
            throw e;
        }
        finally {
            if (null != stmt) {
                stmt.close();
                stmt = null;
            }
            if (null != rs) {
                rs.close();
                rs = null;
            }
        }
        return rid;
    }

    private void setLogPosition(int logid, int logrid, long windowscn, String source) throws SQLException {
        PreparedStatement stmt = this.getLogPositionStmt();
        stmt.setInt(1, logid);
        stmt.setInt(2, logrid);
        stmt.setLong(3, windowscn);
        stmt.setString(4, source);
        stmt.executeUpdate();
    }

    private PreparedStatement getLogPositionStmt() throws SQLException {
        if (this._logScnStmt != null) {
            return this._logScnStmt;
        }
        Connection conn = null;
        try {
            conn = this.getConnection();
            StringBuilder sql = new StringBuilder();
            sql.append("update bootstrap_producer_state set logid = ?, rid = ? , windowscn = ? where srcid = (select id from bootstrap_sources where src = ?)");
            this._logScnStmt = conn.prepareStatement(sql.toString());
        }
        catch (SQLException e) {
            if (null != this._statsCollector) {
                this._statsCollector.registerSQLException();
            }
            LOG.error((Object)"Exception occurred while getting the bootstrap_producer statement", (Throwable)e);
            throw e;
        }
        return this._logScnStmt;
    }

    private void updateSourcesInDB() throws SQLException {
        SourceInfo srcinfo;
        for (Map.Entry<String, SourceInfo> entry : this._trackedSources.entrySet()) {
            srcinfo = entry.getValue();
            srcinfo.saveToDB(this.getConnection());
        }
        for (Map.Entry<String, SourceInfo> entry : this._trackedSources.entrySet()) {
            srcinfo = entry.getValue();
            if (srcinfo.getNumRows() < this._maxRowsInLog) continue;
            srcinfo.switchLogFile(this.getConnection());
            this.setLogPosition(srcinfo.getCurrLogId(), 0, this._newWindowScn, entry.getKey());
            this._bootstrapDao.createNewLogTable(srcinfo.getSrcId());
        }
    }

    private Connection getConnection() throws SQLException {
        Connection conn = null;
        if (this._bootstrapDao == null) {
            BootstrapConn dbConn = new BootstrapConn();
            try {
                boolean autoCommit = false;
                dbConn.initBootstrapConn(false, this._config.getBootstrapDBUsername(), this._config.getBootstrapDBPassword(), this._config.getBootstrapDBHostname(), this._config.getBootstrapDBName());
                this._bootstrapDao = new BootstrapDBMetaDataDAO(dbConn, this._config.getBootstrapDBHostname(), this._config.getBootstrapDBUsername(), this._config.getBootstrapDBPassword(), this._config.getBootstrapDBName(), false);
            }
            catch (Exception e) {
                LOG.fatal((Object)"Unable to open BootstrapDB Connection !!", (Throwable)e);
                return null;
            }
        }
        try {
            conn = this._bootstrapDao.getBootstrapConn().getDBConn();
        }
        catch (SQLException e) {
            if (null != this._statsCollector) {
                this._statsCollector.registerSQLException();
            }
            LOG.fatal((Object)"Not able to open BootstrapDB Connection !!", (Throwable)e);
            throw e;
        }
        return conn;
    }

    private String getTableName(int srcId) throws SQLException {
        return this._bootstrapDao.getBootstrapConn().getLogTableNameToProduce(srcId);
    }

    public static interface ErrorCaseHandler {
        public void onErrorRetryLimitExceeded(String var1, Throwable var2);
    }
}

