/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.bootstrap.server;

import com.linkedin.databus.bootstrap.api.BootstrapEventCallback;
import com.linkedin.databus.bootstrap.api.BootstrapEventProcessResult;
import com.linkedin.databus.bootstrap.api.BootstrapProcessingException;
import com.linkedin.databus.bootstrap.common.BootstrapConn;
import com.linkedin.databus.bootstrap.common.BootstrapDBMetaDataDAO;
import com.linkedin.databus.bootstrap.common.BootstrapDBTimedQuery;
import com.linkedin.databus.bootstrap.server.BootstrapServerStaticConfig;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooOldException;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooYoungException;
import com.linkedin.databus2.core.filter.DbusFilter;
import com.linkedin.databus2.core.filter.DbusKeyFilter;
import com.linkedin.databus2.core.filter.FilterToSQL;
import com.linkedin.databus2.util.DBHelper;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import org.apache.log4j.Logger;

public class BootstrapProcessor {
    public static final String MODULE = BootstrapProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final String EVENT_COLUMNS = "val";
    public static final String PHASE_COMPLETED_HEADER_NAME = "PhaseCompleted";
    public static final String PHASE_COMPLETED_HEADER_TRUE = "TRUE";
    public static final String EMPTY_STRING = "";
    private final long _maxSnapshotRowsPerFetch;
    private final long _maxCatchupRowsPerFetch;
    private final int _queryTimeInSec;
    private BootstrapDBMetaDataDAO _dbDao;
    private final DbusEventsStatisticsCollector _curStatsCollector;
    private DbusKeyFilter keyFilter;
    BootstrapServerStaticConfig config;

    public BootstrapProcessor(BootstrapServerStaticConfig config, DbusEventsStatisticsCollector curStatsCollector) throws InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException, DatabusException {
        this._curStatsCollector = curStatsCollector;
        BootstrapConn dbConn = new BootstrapConn();
        this.config = config;
        boolean autoCommit = true;
        dbConn.initBootstrapConn(true, config.getDb().getBootstrapDBUsername(), config.getDb().getBootstrapDBPassword(), config.getDb().getBootstrapDBHostname(), config.getDb().getBootstrapDBName());
        this._dbDao = new BootstrapDBMetaDataDAO(dbConn, config.getDb().getBootstrapDBHostname(), config.getDb().getBootstrapDBUsername(), config.getDb().getBootstrapDBPassword(), config.getDb().getBootstrapDBName(), true);
        this._maxSnapshotRowsPerFetch = config.getDb().getBootstrapSnapshotBatchSize();
        this._maxCatchupRowsPerFetch = config.getDb().getBootstrapCatchupBatchSize();
        this._queryTimeInSec = config.getQueryTimeoutInSec();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("BootstrapProcessor: config=" + config + ", dbConn=" + dbConn));
        }
    }

    protected BootstrapProcessor() {
        this._curStatsCollector = null;
        this._maxSnapshotRowsPerFetch = -1L;
        this._maxCatchupRowsPerFetch = -1L;
        this._queryTimeInSec = -1;
    }

    public DbusKeyFilter getKeyFilter() {
        return this.keyFilter;
    }

    public void setKeyFilter(DbusKeyFilter keyFilter) {
        this.keyFilter = keyFilter;
    }

    public boolean streamCatchupRows(Checkpoint currState, BootstrapEventCallback callBack) throws SQLException, BootstrapProcessingException, BootstrapDatabaseTooOldException {
        assert (currState.getConsumptionMode() == DbusClientMode.BOOTSTRAP_CATCHUP);
        boolean foundRows = false;
        BootstrapDBMetaDataDAO.SourceStatusInfo srcIdStatusPair = this._dbDao.getSrcIdStatusFromDB(currState.getCatchupSource(), true);
        if (!srcIdStatusPair.isValidSource()) {
            throw new BootstrapProcessingException("Bootstrap DB not servicing source :" + currState.getCatchupSource());
        }
        int curSrcId = srcIdStatusPair.getSrcId();
        int curLogId = this._dbDao.getLogIdToCatchup(curSrcId, currState.getWindowScn());
        int targetLogId = this._dbDao.getLogIdToCatchup(curSrcId, currState.getBootstrapTargetScn().longValue());
        boolean phaseCompleted = false;
        Statement stmt = null;
        try {
            ResultSet rs = null;
            while (!foundRows && curLogId <= targetLogId) {
                stmt = this.createCatchupStatement(curSrcId, curLogId, currState);
                rs = new BootstrapDBTimedQuery((PreparedStatement)stmt, this._queryTimeInSec).executeQuery();
                foundRows = rs.isBeforeFirst();
                if (foundRows) continue;
                currState.setCatchupOffset(Integer.valueOf(0));
                LOG.info((Object)("Moving to next log table log_" + curSrcId + "_" + ++curLogId + " because current log table exhausted!"));
            }
            phaseCompleted = this.streamOutRows(currState, rs, callBack, this._maxCatchupRowsPerFetch);
        }
        catch (SQLException e) {
            LOG.error((Object)("Exception occured during fetching catchup rows" + e));
            throw e;
        }
        finally {
            if (stmt != null) {
                stmt.close();
                stmt = null;
            }
            this.mergeAndResetStats();
        }
        return phaseCompleted;
    }

    private String getFilterSQL() {
        if (this.keyFilter == null) {
            return EMPTY_STRING;
        }
        ArrayList filters = this.keyFilter.getFilters();
        ArrayList<String> filterStrings = new ArrayList<String>(filters.size());
        for (int i = 0; i < filters.size(); ++i) {
            String filterStringTemp = FilterToSQL.convertToSQL((DbusFilter)((DbusFilter)filters.get(i)));
            if (filterStringTemp == EMPTY_STRING) continue;
            filterStrings.add(filterStringTemp);
        }
        if (filterStrings.size() == 0) {
            return EMPTY_STRING;
        }
        StringBuilder filterSqlBuilder = new StringBuilder();
        filterSqlBuilder.append(" ( ");
        for (int i = 0; i < filterStrings.size(); ++i) {
            filterSqlBuilder.append((String)filterStrings.get(i));
            if (i == filterStrings.size() - 1) continue;
            filterSqlBuilder.append(" OR ");
        }
        filterSqlBuilder.append(" ) ");
        return filterSqlBuilder.toString();
    }

    public String getCatchupSQLString(String catchupTab) {
        return this.getCatchupSQLString(catchupTab, null);
    }

    public String getCatchupSQLString(String catchupTab, String source) {
        StringBuilder sql = new StringBuilder();
        String filterSql = this.getFilterSQL();
        boolean predicatePushDown = this.config.isPredicatePushDownEnabled(source) && !filterSql.isEmpty() && filterSql != null;
        sql.append("Select ");
        sql.append("id, ");
        sql.append("scn, ");
        sql.append("windowscn, ");
        sql.append(EVENT_COLUMNS);
        if (predicatePushDown) {
            sql.append(", CAST(srckey as SIGNED) as srckey");
        }
        sql.append(" from ");
        sql.append(catchupTab);
        sql.append(" where ");
        sql.append(" id > ? ");
        sql.append(" and windowscn >= ? and windowscn <= ? ");
        sql.append(" and windowscn >= ? ");
        if (predicatePushDown) {
            sql.append("AND " + filterSql);
        }
        sql.append(" order by id limit ?");
        return sql.toString();
    }

    public String getSnapshotSQLString(String snapShotTable) {
        return this.getSnapshotSQLString(snapShotTable, null);
    }

    public String getSnapshotSQLString(String snapShotTable, String source) {
        StringBuilder sql = new StringBuilder();
        String filterSql = this.getFilterSQL();
        boolean predicatePushDown = this.config.isPredicatePushDownEnabled(source) && !filterSql.isEmpty() && filterSql != null;
        sql.append("Select ");
        sql.append("id, ");
        sql.append("scn, ");
        if (predicatePushDown) {
            sql.append(" CAST(srckey as SIGNED) as srckey, ");
        } else {
            sql.append("srckey, ");
        }
        sql.append(EVENT_COLUMNS);
        sql.append(" from ");
        sql.append(snapShotTable);
        sql.append(" where ");
        sql.append(" id > ? ");
        sql.append(" and scn < ? ");
        sql.append(" and scn >= ? ");
        if (predicatePushDown) {
            sql.append("AND " + filterSql);
        }
        sql.append(" order by id limit ?");
        return sql.toString();
    }

    private PreparedStatement createCatchupStatement(int srcId, int logId, Checkpoint currState) throws SQLException {
        Connection conn = this._dbDao.getBootstrapConn().getDBConn();
        String catchupTab = "log_" + srcId + "_" + logId;
        PreparedStatement stmt = null;
        String catchUpString = this.getCatchupSQLString(catchupTab, currState.getCatchupSource());
        long offset = -1L;
        try {
            stmt = conn.prepareStatement(catchUpString);
            offset = currState.getWindowOffset();
            int i = 1;
            stmt.setLong(i++, offset);
            stmt.setLong(i++, currState.getBootstrapStartScn());
            stmt.setLong(i++, currState.getBootstrapTargetScn());
            stmt.setLong(i++, currState.getBootstrapSinceScn());
            stmt.setLong(i++, this._maxCatchupRowsPerFetch);
        }
        catch (SQLException ex) {
            DBHelper.close((Statement)stmt);
        }
        LOG.info((Object)("Catchup SQL String: " + catchUpString + ", " + offset + ", " + currState.getBootstrapStartScn() + " , " + currState.getBootstrapTargetScn() + " , " + currState.getBootstrapSinceScn() + " , " + this._maxCatchupRowsPerFetch));
        return stmt;
    }

    public boolean streamSnapShotRows(Checkpoint currState, BootstrapEventCallback callBack) throws SQLException, BootstrapProcessingException, BootstrapDatabaseTooOldException, BootstrapDatabaseTooYoungException {
        long sinceSCN;
        assert (currState.getConsumptionMode() == DbusClientMode.BOOTSTRAP_SNAPSHOT);
        boolean phaseCompleted = false;
        long startSCN = currState.getBootstrapStartScn();
        if (startSCN <= (sinceSCN = currState.getBootstrapSinceScn().longValue())) {
            LOG.info((Object)("StartSCN is less than or equal to sinceSCN. Bypassing snapshot phase !! startSCN:" + startSCN + ",sinceSCN:" + sinceSCN));
            return true;
        }
        Connection conn = this._dbDao.getBootstrapConn().getDBConn();
        BootstrapDBMetaDataDAO.SourceStatusInfo srcIdStatusPair = this._dbDao.getSrcIdStatusFromDB(currState.getSnapshotSource(), true);
        if (!srcIdStatusPair.isValidSource()) {
            throw new BootstrapProcessingException("Bootstrap DB not servicing source :" + currState.getCatchupSource());
        }
        Statement stmt = null;
        ResultSet rs = null;
        try {
            if (this.config.isEnableMinScnCheck()) {
                long minScn = this._dbDao.getMinScnOfSnapshots(new int[]{srcIdStatusPair.getSrcId()});
                LOG.info((Object)("Min scn for tab tables is: " + minScn));
                if (minScn == -1L) {
                    throw new BootstrapDatabaseTooYoungException("BootstrapDB has no minScn for these sources, but minScn check is enabled! minScn=" + minScn);
                }
                if (sinceSCN <= minScn && (sinceSCN != 0L || minScn != 0L)) {
                    LOG.error((Object)("Bootstrap Snapshot doesn't have requested data . sinceScn too old! sinceScn is " + sinceSCN + " but minScn available is " + minScn));
                    throw new BootstrapDatabaseTooYoungException("Min scn=" + minScn + " Since scn=" + sinceSCN);
                }
            } else {
                LOG.debug((Object)"Bypassing minScn check!");
            }
            String snapshotSQL = this.getSnapshotSQLString(this._dbDao.getBootstrapConn().getSrcTableName(srcIdStatusPair.getSrcId()), currState.getSnapshotSource());
            stmt = conn.prepareStatement(snapshotSQL);
            long offset = currState.getSnapshotOffset();
            int i = 1;
            stmt.setLong(i++, offset);
            stmt.setLong(i++, currState.getBootstrapStartScn());
            stmt.setLong(i++, currState.getBootstrapSinceScn());
            stmt.setLong(i++, this._maxSnapshotRowsPerFetch);
            LOG.info((Object)("SnapshotSQL string: " + snapshotSQL + ", " + offset + ", " + currState.getBootstrapStartScn() + ", " + currState.getBootstrapSinceScn() + ", " + this._maxSnapshotRowsPerFetch));
            rs = new BootstrapDBTimedQuery((PreparedStatement)stmt, this._queryTimeInSec).executeQuery();
            phaseCompleted = this.streamOutRows(currState, rs, callBack, this._maxSnapshotRowsPerFetch);
        }
        catch (SQLException e) {
            DBHelper.close(rs, (Statement)stmt, null);
            LOG.error((Object)("Exception occurred when getting snapshot rows" + e));
            throw e;
        }
        finally {
            if (stmt != null) {
                stmt.close();
                stmt = null;
            }
            this.mergeAndResetStats();
        }
        return phaseCompleted;
    }

    private boolean streamOutRows(Checkpoint ckpt, ResultSet rs, BootstrapEventCallback callback, long maxRowsPerFetch) throws SQLException, BootstrapProcessingException {
        BootstrapEventProcessResult result = null;
        long windowScn = Long.MIN_VALUE;
        long numRowsReadFromDb = 0L;
        while (rs.next()) {
            ++numRowsReadFromDb;
            long rid = rs.getLong(1);
            result = callback.onEvent(rs, this._curStatsCollector);
            if (result.isClientBufferLimitExceeded() || result.isError()) break;
            if (DbusClientMode.BOOTSTRAP_SNAPSHOT == ckpt.getConsumptionMode()) {
                ckpt.onSnapshotEvent(rid);
                continue;
            }
            if (DbusClientMode.BOOTSTRAP_CATCHUP == ckpt.getConsumptionMode()) {
                windowScn = rs.getLong(3);
                ckpt.onCatchupEvent(windowScn, rid);
                continue;
            }
            String errMsg = "The checkpoint received by bootstrap server is neither SNAPSHOT nor CATCHUP" + ckpt;
            LOG.error((Object)errMsg);
            throw new RuntimeException(errMsg);
        }
        if (numRowsReadFromDb > maxRowsPerFetch) {
            String errMsg = "Number of rows read from DB = " + numRowsReadFromDb + " are greater than sepcfied maxRowsPerFetch = " + maxRowsPerFetch;
            LOG.error((Object)errMsg);
            throw new RuntimeException(errMsg);
        }
        this.writeCkptIfAppropriate(result, callback, numRowsReadFromDb, ckpt, rs.getStatement().toString());
        boolean isPhaseCompleted = this.computeIsPhaseCompleted(result, ckpt, numRowsReadFromDb, maxRowsPerFetch, windowScn);
        if (DbusClientMode.BOOTSTRAP_CATCHUP == ckpt.getConsumptionMode() && ckpt.getBootstrapTargetScn() < windowScn) {
            throw new RuntimeException("Events with higher windowscn delivered: bootstrapTargetScn=" + ckpt.getBootstrapTargetScn() + " event windowscn=" + windowScn);
        }
        return isPhaseCompleted;
    }

    protected void writeCkptIfAppropriate(BootstrapEventProcessResult result, BootstrapEventCallback callback, long numRowsReadFromDb, Checkpoint ckpt, String resultSetStmtStr) throws SQLException {
        assert (null != callback);
        assert (numRowsReadFromDb >= 0L);
        assert (null != ckpt);
        if (null != result && !result.isError()) {
            assert (numRowsReadFromDb >= result.getNumRowsWritten());
            if (result.getNumRowsWritten() > 0L) {
                callback.onCheckpointEvent(ckpt, this._curStatsCollector);
            } else if (result.getNumRowsWritten() == 0L && numRowsReadFromDb > 0L) {
                if (!result.isClientBufferLimitExceeded()) {
                    LOG.info((Object)("All the rows read from DB have been filtered out by user-level filter. numRowsReadFromDb = " + numRowsReadFromDb + " sending checkpoint = " + ckpt));
                    callback.onCheckpointEvent(ckpt, this._curStatsCollector);
                } else {
                    LOG.info((Object)("There have been rowsReadFromDb that could not be written as the clientBufferLimit has been exceeded. A checkpoint will not be sent, but pendingEvent header will be set. numRowsReadFromDb = " + numRowsReadFromDb + " checkpoint = " + ckpt));
                }
            } else {
                String errMsg = "This is an error-case that should not happen. First, there were no rows in the resultSet  Second, this is not a case where all the events have been filtered out. Both of these cannot happen simulateneously.  Debug information is logged below.  numRowsReadFromDb = " + numRowsReadFromDb + " result = " + result + " resultSet statement " + resultSetStmtStr;
                LOG.error((Object)errMsg);
                throw new RuntimeException(errMsg);
            }
        }
    }

    protected boolean computeIsPhaseCompleted(BootstrapEventProcessResult result, Checkpoint ckpt, long numRowsReadFromDb, long maxRowsPerFetch, long windowScn) {
        assert (numRowsReadFromDb <= maxRowsPerFetch);
        boolean isPhaseCompleted = false;
        if (null == result) {
            assert (numRowsReadFromDb == 0L);
            isPhaseCompleted = true;
        } else {
            if (numRowsReadFromDb < maxRowsPerFetch && DbusClientMode.BOOTSTRAP_SNAPSHOT == ckpt.getConsumptionMode()) {
                isPhaseCompleted = true;
            } else if (numRowsReadFromDb < maxRowsPerFetch && DbusClientMode.BOOTSTRAP_CATCHUP == ckpt.getConsumptionMode() && ckpt.getBootstrapTargetScn() == windowScn) {
                isPhaseCompleted = true;
            }
            LOG.info((Object)("Terminating batch with result: " + result));
            if (result.isError() || result.isClientBufferLimitExceeded()) {
                isPhaseCompleted = false;
            }
        }
        return isPhaseCompleted;
    }

    private void mergeAndResetStats() {
    }

    public void shutdown() {
        if (null != this._dbDao) {
            this._dbDao.getBootstrapConn().close();
            this._dbDao = null;
        }
    }
}

