/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers.db;

import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.seq.MaxSCNWriter;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.db.EventReaderSummary;
import com.linkedin.databus2.producers.db.OracleTriggerMonitoredSourceInfo;
import com.linkedin.databus2.producers.db.ReadEventCycleSummary;
import com.linkedin.databus2.producers.db.SourceDBEventReader;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import com.linkedin.databus2.util.DBHelper;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.log4j.Logger;

public class OracleTxlogEventReader
implements SourceDBEventReader {
    public static final String MODULE = OracleTxlogEventReader.class.getName();
    private final String _name;
    private final List<OracleTriggerMonitoredSourceInfo> _sources;
    private final String _selectSchema;
    private final DataSource _dataSource;
    private static final int DEFAULT_STMT_FETCH_SIZE = 100;
    private static final int MAX_STMT_FETCH_SIZE = 1000;
    private final DbusEventBufferAppendable _eventBuffer;
    private final boolean _enableTracing;
    private final Map<Short, String> _eventQueriesBySource;
    private final Map<Short, String> _eventChunkedScnQueriesBySource;
    private final Map<Short, String> _eventChunkedTxnQueriesBySource;
    private PreparedStatement _txnChunkJumpScnStmt;
    private final Logger _log;
    private final Logger _eventsLog;
    private Connection _eventSelectConnection;
    private final DbusEventsStatisticsCollector _relayInboundStatsCollector;
    private final MaxSCNWriter _maxScnWriter;
    private long _lastquerytime;
    private long _lastMaxScnTime;
    private final long _slowQuerySourceThreshold;
    private final PhysicalSourceStaticConfig.ChunkingType _chunkingType;
    private final long _txnsPerChunk;
    private final long _scnChunkSize;
    private final long _chunkedScnThreshold;
    private final long _maxScnDelayMs;
    private long _lastSeenEOP = -1L;
    private volatile boolean _inChunkingMode = false;
    private volatile long _catchupTargetMaxScn = -1L;

    public OracleTxlogEventReader(String name, List<OracleTriggerMonitoredSourceInfo> sources, DataSource dataSource, DbusEventBufferAppendable eventBuffer, boolean enableTracing, DbusEventsStatisticsCollector dbusEventsStatisticsCollector, MaxSCNWriter maxScnWriter, long slowQuerySourceThreshold, PhysicalSourceStaticConfig.ChunkingType chunkingType, long txnsPerChunk, long scnChunkSize, long chunkedScnThreshold, long maxScnDelayMs) {
        String eventQuery;
        ArrayList<OracleTriggerMonitoredSourceInfo> sourcesTemp = new ArrayList<OracleTriggerMonitoredSourceInfo>();
        sourcesTemp.addAll(sources);
        this._name = name;
        this._sources = Collections.unmodifiableList(sourcesTemp);
        this._dataSource = dataSource;
        this._eventBuffer = eventBuffer;
        this._enableTracing = enableTracing;
        this._relayInboundStatsCollector = dbusEventsStatisticsCollector;
        this._maxScnWriter = maxScnWriter;
        this._slowQuerySourceThreshold = slowQuerySourceThreshold;
        this._log = Logger.getLogger((String)(this.getClass().getName() + "." + this._name));
        this._eventsLog = Logger.getLogger((String)("com.linkedin.databus2.producers.db.events." + this._name));
        this._chunkingType = chunkingType;
        this._txnsPerChunk = txnsPerChunk;
        this._scnChunkSize = scnChunkSize;
        this._chunkedScnThreshold = chunkedScnThreshold;
        this._maxScnDelayMs = maxScnDelayMs;
        this._lastquerytime = System.currentTimeMillis();
        for (OracleTriggerMonitoredSourceInfo source : sourcesTemp) {
            if (source.getEventSchema().equalsIgnoreCase(((OracleTriggerMonitoredSourceInfo)sourcesTemp.get(0)).getEventSchema())) continue;
            throw new IllegalArgumentException("All logical sources must have the same Oracle schema:\n   " + source.getSourceName() + " (id " + source.getSourceId() + ") schema = " + source.getEventSchema() + ";\n   " + ((OracleTriggerMonitoredSourceInfo)sourcesTemp.get(0)).getSourceName() + " (id " + ((OracleTriggerMonitoredSourceInfo)sourcesTemp.get(0)).getSourceId() + ") schema = " + ((OracleTriggerMonitoredSourceInfo)sourcesTemp.get(0)).getEventSchema());
        }
        this._selectSchema = ((OracleTriggerMonitoredSourceInfo)sourcesTemp.get(0)).getEventSchema() == null ? "" : ((OracleTriggerMonitoredSourceInfo)sourcesTemp.get(0)).getEventSchema() + ".";
        this._eventQueriesBySource = new HashMap<Short, String>();
        for (OracleTriggerMonitoredSourceInfo sourceInfo : sources) {
            eventQuery = this.generateEventQuery(sourceInfo);
            this._log.info((Object)("Generated events query. source: " + sourceInfo + " ; eventQuery: " + eventQuery));
            this._eventQueriesBySource.put(sourceInfo.getSourceId(), eventQuery);
        }
        this._eventChunkedTxnQueriesBySource = new HashMap<Short, String>();
        for (OracleTriggerMonitoredSourceInfo sourceInfo : sources) {
            eventQuery = OracleTxlogEventReader.generateTxnChunkedQuery(sourceInfo, this._selectSchema);
            this._log.info((Object)("Generated Chunked Txn events query. source: " + sourceInfo + " ; chunkTxnEventQuery: " + eventQuery));
            this._eventChunkedTxnQueriesBySource.put(sourceInfo.getSourceId(), eventQuery);
        }
        this._eventChunkedScnQueriesBySource = new HashMap<Short, String>();
        for (OracleTriggerMonitoredSourceInfo sourceInfo : sources) {
            eventQuery = OracleTxlogEventReader.generateScnChunkedQuery(sourceInfo);
            this._log.info((Object)("Generated Chunked Scn events query. source: " + sourceInfo + " ; chunkScnEventQuery: " + eventQuery));
            this._eventChunkedScnQueriesBySource.put(sourceInfo.getSourceId(), eventQuery);
        }
    }

    @Override
    public ReadEventCycleSummary readEventsFromAllSources(long sinceSCN) throws DatabusException, EventCreationException, UnsupportedKeyException {
        boolean eventBufferNeedsRollback = true;
        boolean debugEnabled = this._log.isDebugEnabled();
        ArrayList<EventReaderSummary> summaries = new ArrayList<EventReaderSummary>();
        try {
            boolean chunkMode;
            long cycleStartTS = System.currentTimeMillis();
            this._eventBuffer.startEvents();
            if (this._eventSelectConnection == null || this._eventSelectConnection.isClosed()) {
                this.resetConnections();
            }
            if (sinceSCN <= 0L) {
                this._catchupTargetMaxScn = sinceSCN = this.getMaxTxlogSCN(this._eventSelectConnection);
                this._log.debug((Object)("sinceSCN was <= 0. Overriding with the current max SCN=" + sinceSCN));
                this._eventBuffer.setStartSCN(sinceSCN);
                try {
                    DBHelper.commit((Connection)this._eventSelectConnection);
                }
                catch (SQLException s) {
                    DBHelper.rollback((Connection)this._eventSelectConnection);
                }
            } else if (this._chunkingType.isChunkingEnabled() && this._catchupTargetMaxScn <= 0L) {
                this._catchupTargetMaxScn = this.getMaxTxlogSCN(this._eventSelectConnection);
                this._log.debug((Object)("catchupTargetMaxScn was <= 0. Overriding with the current max SCN=" + this._catchupTargetMaxScn));
            }
            if (this._catchupTargetMaxScn <= 0L) {
                this._inChunkingMode = false;
            }
            List<OracleTriggerMonitoredSourceInfo> filteredSources = this.filterSources(sinceSCN);
            long endOfPeriodScn = -1L;
            for (OracleTriggerMonitoredSourceInfo source : this._sources) {
                if (filteredSources.contains(source)) {
                    long startTS = System.currentTimeMillis();
                    EventReaderSummary summary = this.readEventsFromOneSource(this._eventSelectConnection, source, sinceSCN);
                    summaries.add(summary);
                    endOfPeriodScn = Math.max(endOfPeriodScn, summary.getEndOfPeriodSCN());
                    long endTS = System.currentTimeMillis();
                    source.getStatisticsBean().addTimeOfLastDBAccess(endTS);
                    if (this._eventsLog.isDebugEnabled() || this._eventsLog.isInfoEnabled() && summary.getNumberOfEvents() > 0) {
                        this._eventsLog.info((Object)summary.toString());
                    }
                    if (summary.getNumberOfEvents() > 0) {
                        source.getStatisticsBean().addEventCycle(summary.getNumberOfEvents(), endTS - startTS, summary.getSizeOfSerializedEvents(), summary.getEndOfPeriodSCN());
                        continue;
                    }
                    source.getStatisticsBean().addEmptyEventCycle();
                    continue;
                }
                source.getStatisticsBean().addEmptyEventCycle();
            }
            this._lastSeenEOP = Math.max(this._lastSeenEOP, Math.max(endOfPeriodScn, sinceSCN));
            long curtime = System.currentTimeMillis();
            if (endOfPeriodScn == -1L) {
                if (sinceSCN + this._scnChunkSize <= this._catchupTargetMaxScn && PhysicalSourceStaticConfig.ChunkingType.SCN_CHUNKING == this._chunkingType) {
                    endOfPeriodScn = sinceSCN + this._scnChunkSize;
                    this._lastquerytime = curtime;
                } else if (PhysicalSourceStaticConfig.ChunkingType.TXN_CHUNKING == this._chunkingType && this._inChunkingMode) {
                    long nextBatchScn = this.getMaxScnSkippedForTxnChunked(this._eventSelectConnection, sinceSCN, this._txnsPerChunk);
                    this._log.info((Object)("No events while in txn chunking. CurrScn : " + sinceSCN + ", jumping to :" + nextBatchScn));
                    endOfPeriodScn = nextBatchScn;
                    this._lastquerytime = curtime;
                } else if (curtime - this._lastquerytime > this._slowQuerySourceThreshold) {
                    this._lastquerytime = curtime;
                    long maxTxlogSCN = this.getMaxTxlogSCN(this._eventSelectConnection);
                    endOfPeriodScn = Math.max(maxTxlogSCN, sinceSCN);
                    this._log.info((Object)("SlowSourceQueryThreshold hit. currScn : " + sinceSCN + ". Advanced endOfPeriodScn to " + endOfPeriodScn + " and added the event to relay"));
                    if (debugEnabled) {
                        this._log.debug((Object)("No events processed. Read max SCN from txlog table for endOfPeriodScn. endOfPeriodScn=" + endOfPeriodScn));
                    }
                }
                if (endOfPeriodScn != -1L && endOfPeriodScn > sinceSCN) {
                    this._log.info((Object)("The endOfPeriodScn has advanced from to " + endOfPeriodScn));
                    this._eventBuffer.endEvents(endOfPeriodScn, this._relayInboundStatsCollector);
                    eventBufferNeedsRollback = false;
                } else {
                    eventBufferNeedsRollback = true;
                }
            } else {
                this._lastquerytime = curtime;
                this._eventBuffer.endEvents(endOfPeriodScn, this._relayInboundStatsCollector);
                if (debugEnabled) {
                    this._log.debug((Object)("End of events: " + endOfPeriodScn + " windown range= " + this._eventBuffer.getMinScn() + "," + this._eventBuffer.lastWrittenScn()));
                }
                eventBufferNeedsRollback = false;
            }
            if (endOfPeriodScn != -1L) {
                if (null != this._maxScnWriter && endOfPeriodScn != sinceSCN) {
                    this._maxScnWriter.saveMaxScn(endOfPeriodScn);
                }
                for (OracleTriggerMonitoredSourceInfo source : this._sources) {
                    source.getStatisticsBean().addMaxDBScn(endOfPeriodScn);
                    source.getStatisticsBean().addTimeOfLastDBAccess(System.currentTimeMillis());
                }
            }
            long cycleEndTS = System.currentTimeMillis();
            if (this._chunkingType.isChunkingEnabled() && this._lastSeenEOP >= this._catchupTargetMaxScn && curtime - this._lastMaxScnTime >= this._maxScnDelayMs) {
                this._catchupTargetMaxScn = -1L;
            }
            boolean bl = chunkMode = this._chunkingType.isChunkingEnabled() && this._catchupTargetMaxScn > 0L && this._lastSeenEOP < this._catchupTargetMaxScn;
            if (!chunkMode && this._inChunkingMode) {
                this._log.info((Object)"Disabling chunking for sources !!");
            }
            this._inChunkingMode = chunkMode;
            if (this._inChunkingMode && debugEnabled) {
                this._log.debug((Object)("_inChunkingMode = true, _catchupTargetMaxScn=" + this._catchupTargetMaxScn + ", endOfPeriodScn=" + endOfPeriodScn + ", _lastSeenEOP=" + this._lastSeenEOP));
            }
            ReadEventCycleSummary summary = new ReadEventCycleSummary(this._name, summaries, Math.max(endOfPeriodScn, sinceSCN), cycleEndTS - cycleStartTS);
            DBHelper.commit((Connection)this._eventSelectConnection);
            ReadEventCycleSummary readEventCycleSummary = summary;
            return readEventCycleSummary;
        }
        catch (SQLException ex) {
            try {
                DBHelper.rollback((Connection)this._eventSelectConnection);
            }
            catch (SQLException s) {
                throw new DatabusException(s.getMessage());
            }
            this.handleExceptionInReadEvents(ex);
            throw new DatabusException((Throwable)ex);
        }
        catch (Exception e) {
            this.handleExceptionInReadEvents(e);
            throw new DatabusException((Throwable)e);
        }
        finally {
            if (eventBufferNeedsRollback) {
                if (this._log.isDebugEnabled()) {
                    this._log.debug((Object)"Rolling back the event buffer because eventBufferNeedsRollback is true.");
                }
                this._eventBuffer.rollbackEvents();
            }
        }
    }

    private void handleExceptionInReadEvents(Exception e) {
        DBHelper.close((Connection)this._eventSelectConnection);
        this._eventSelectConnection = null;
        if (!this._inChunkingMode && this._chunkingType.isChunkingEnabled()) {
            this._catchupTargetMaxScn = -1L;
        }
        this._log.error((Object)("readEventsFromAllSources exception:" + e.getMessage()), (Throwable)e);
        for (OracleTriggerMonitoredSourceInfo source : this._sources) {
            source.getStatisticsBean().addError();
        }
    }

    private PreparedStatement createQueryStatement(Connection conn, OracleTriggerMonitoredSourceInfo source, long sinceScn, int currentFetchSize, boolean useChunking) throws SQLException {
        boolean debugEnabled = this._log.isDebugEnabled();
        String eventQuery = null;
        PhysicalSourceStaticConfig.ChunkingType type = this._chunkingType;
        eventQuery = !useChunking || !type.isChunkingEnabled() ? this._eventQueriesBySource.get(source.getSourceId()) : (type == PhysicalSourceStaticConfig.ChunkingType.SCN_CHUNKING ? this._eventChunkedScnQueriesBySource.get(source.getSourceId()) : this._eventChunkedTxnQueriesBySource.get(source.getSourceId()));
        if (debugEnabled) {
            this._log.debug((Object)("source[" + source.getEventView() + "]: " + eventQuery + "; skipInfinityScn=" + source.isSkipInfinityScn() + " ; sinceScn=" + sinceScn));
        }
        PreparedStatement pStmt = conn.prepareStatement(eventQuery);
        if (!useChunking || !type.isChunkingEnabled()) {
            pStmt.setFetchSize(currentFetchSize);
            pStmt.setLong(1, sinceScn);
            if (!source.isSkipInfinityScn()) {
                pStmt.setLong(2, sinceScn);
            }
        } else {
            int i = 1;
            pStmt.setLong(i++, sinceScn);
            pStmt.setLong(i++, sinceScn);
            if (PhysicalSourceStaticConfig.ChunkingType.TXN_CHUNKING == type) {
                pStmt.setLong(i++, this._txnsPerChunk);
            } else {
                long untilScn = sinceScn + this._scnChunkSize;
                this._log.info((Object)("SCN chunking mode, next target SCN is: " + untilScn));
                pStmt.setLong(i++, untilScn);
            }
        }
        return pStmt;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private EventReaderSummary readEventsFromOneSource(Connection con, OracleTriggerMonitoredSourceInfo source, long sinceScn) throws SQLException, UnsupportedKeyException, EventCreationException {
        EventReaderSummary eventReaderSummary;
        boolean useChunking = false;
        if (this._chunkingType.isChunkingEnabled()) {
            boolean bl = useChunking = sinceScn + this._chunkedScnThreshold <= this._catchupTargetMaxScn;
            if (useChunking && !this._inChunkingMode) {
                this._log.info((Object)"Enabling chunking for sources !!");
            }
            this._log.debug((Object)("SinceScn :" + sinceScn + ", _ChunkedScnThreshold :" + this._chunkedScnThreshold + ", _catchupTargetMaxScn:" + this._catchupTargetMaxScn + ", useChunking :" + useChunking));
        }
        this._inChunkingMode = this._inChunkingMode || useChunking;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        long endOfPeriodSCN = -1L;
        int currentFetchSize = 100;
        int numRowsFetched = 0;
        try {
            EventReaderSummary summary;
            long startTS = System.currentTimeMillis();
            long totalEventSerializeTime = 0L;
            pstmt = this.createQueryStatement(con, source, sinceScn, currentFetchSize, useChunking);
            long t = System.currentTimeMillis();
            rs = pstmt.executeQuery();
            long queryExecTime = System.currentTimeMillis() - t;
            long totalEventSize = 0L;
            long tsWindowStart = Long.MAX_VALUE;
            long tsWindowEnd = Long.MIN_VALUE;
            while (rs.next()) {
                long scn = rs.getLong(1);
                long timestamp = rs.getTimestamp(2).getTime();
                tsWindowEnd = Math.max(timestamp, tsWindowEnd);
                tsWindowStart = Math.min(timestamp, tsWindowStart);
                long tsStart = System.currentTimeMillis();
                long eventSize = source.getFactory().createAndAppendEvent(scn, timestamp, rs, this._eventBuffer, this._enableTracing, this._relayInboundStatsCollector);
                totalEventSerializeTime += System.currentTimeMillis() - tsStart;
                totalEventSize += eventSize;
                endOfPeriodSCN = Math.max(endOfPeriodSCN, scn);
                if (++numRowsFetched <= currentFetchSize || currentFetchSize == 1000) continue;
                currentFetchSize = Math.min(2 * currentFetchSize, 1000);
                pstmt.setFetchSize(currentFetchSize);
            }
            long endTS = System.currentTimeMillis();
            if (this._inChunkingMode && PhysicalSourceStaticConfig.ChunkingType.TXN_CHUNKING == this._chunkingType) {
                this._log.info((Object)("txn chunking mode: since=" + sinceScn + " eop=" + endOfPeriodSCN));
            }
            eventReaderSummary = summary = new EventReaderSummary(source.getSourceId(), source.getSourceName(), endOfPeriodSCN, numRowsFetched, totalEventSize, endTS - startTS, totalEventSerializeTime, tsWindowStart, tsWindowEnd, queryExecTime);
        }
        catch (Throwable throwable) {
            DBHelper.close(rs, pstmt, null);
            throw throwable;
        }
        DBHelper.close((ResultSet)rs, (Statement)pstmt, null);
        return eventReaderSummary;
    }

    List<OracleTriggerMonitoredSourceInfo> filterSources(long startSCN) throws DatabusException {
        return this._sources;
    }

    public void resetConnections() throws SQLException {
        this._eventSelectConnection = this._dataSource.getConnection();
        this._log.info((Object)("JDBC Version is: " + this._eventSelectConnection.getMetaData().getDriverVersion()));
        this._eventSelectConnection.setAutoCommit(false);
        this._eventSelectConnection.setTransactionIsolation(8);
    }

    String generateEventQuery(OracleTriggerMonitoredSourceInfo sourceInfo) {
        String sql = OracleTxlogEventReader.generateEventQuery(sourceInfo, this._selectSchema);
        this._log.info((Object)("EventQuery=" + sql));
        return sql;
    }

    static String generateSkipInfScnQuery(OracleTriggerMonitoredSourceInfo sourceInfo, String selectSchema) {
        StringBuilder sql = new StringBuilder();
        sql.append("select ");
        if (sourceInfo.hasEventQueryHints()) {
            sql.append(sourceInfo.getEventQueryHints());
            sql.append(" ");
        } else {
            sql.append("/*+ first_rows LEADING(tx) */");
            sql.append(" ");
        }
        sql.append(" tx.scn scn, tx.ts event_timestamp, src.* ");
        sql.append("from ");
        sql.append(selectSchema + "sy$").append(sourceInfo.getEventView()).append(" src, " + selectSchema + "sy$txlog tx ");
        sql.append("where ");
        sql.append("src.txn=tx.txn and ");
        sql.append("tx.scn > ? and tx.scn < 9999999999999999999999999999");
        return sql.toString();
    }

    static String generateNoSkipInfScnQuery(OracleTriggerMonitoredSourceInfo sourceInfo, String selectSchema) {
        StringBuilder sql = new StringBuilder();
        sql.append("select ");
        if (sourceInfo.hasEventQueryHints()) {
            sql.append(sourceInfo.getEventQueryHints());
            sql.append(" ");
        } else {
            sql.append("/*+ first_rows LEADING(tx) */");
            sql.append(" ");
        }
        sql.append(selectSchema + "sync_core.getScn(tx.scn, tx.ora_rowscn) scn, tx.ts event_timestamp, src.* ");
        sql.append("from ");
        sql.append(selectSchema + "sy$").append(sourceInfo.getEventView()).append(" src, " + selectSchema + "sy$txlog tx ");
        sql.append("where ");
        sql.append("src.txn=tx.txn and ");
        sql.append("tx.scn > ? and ");
        sql.append("tx.ora_rowscn > ?");
        return sql.toString();
    }

    static String generateEventQuery(OracleTriggerMonitoredSourceInfo sourceInfo, String selectSchema) {
        if (sourceInfo.isSkipInfinityScn()) {
            return OracleTxlogEventReader.generateSkipInfScnQuery(sourceInfo, selectSchema);
        }
        return OracleTxlogEventReader.generateNoSkipInfScnQuery(sourceInfo, selectSchema);
    }

    static String generateTxnChunkedQuery(OracleTriggerMonitoredSourceInfo sourceInfo, String selectSchema) {
        StringBuilder sql = new StringBuilder();
        sql.append("SELECT scn, event_timestamp, src.* ");
        sql.append("FROM ").append(selectSchema).append("sy$").append(sourceInfo.getEventView()).append(" src, ");
        sql.append("( SELECT ");
        String hints = sourceInfo.getEventTxnChunkedQueryHints();
        sql.append(hints).append(" ");
        sql.append(selectSchema + "sync_core.getScn(tx.scn, tx.ora_rowscn) scn, tx.ts event_timestamp, ");
        sql.append("tx.txn, row_number() OVER (ORDER BY TX.SCN) r ");
        sql.append("FROM ").append(selectSchema + "sy$txlog tx ");
        sql.append("WHERE tx.scn > ? AND tx.ora_rowscn > ? AND tx.scn < 9999999999999999999999999999) t ");
        sql.append("WHERE src.txn = t.txn AND r<= ? ");
        sql.append("ORDER BY r ");
        return sql.toString();
    }

    static String generateScnChunkedQuery(OracleTriggerMonitoredSourceInfo sourceInfo) {
        StringBuilder sql = new StringBuilder();
        sql.append("SELECT ");
        String hints = sourceInfo.getEventScnChunkedQueryHints();
        sql.append(hints).append(" ");
        sql.append("sync_core.getScn(tx.scn, tx.ora_rowscn) scn, tx.ts event_timestamp, src.* ");
        sql.append("FROM sy$").append(sourceInfo.getEventView()).append(" src, sy$txlog tx ");
        sql.append("WHERE src.txn=tx.txn AND tx.scn > ? AND tx.ora_rowscn > ? AND ");
        sql.append(" tx.ora_rowscn <= ?");
        return sql.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getMaxScnSkippedForTxnChunked(Connection db, long currScn, long txnsPerChunk) throws SQLException {
        ResultSet rs;
        long retScn;
        block6: {
            this.generateMaxScnSkippedForTxnChunkedQuery(db);
            PreparedStatement stmt = this._txnChunkJumpScnStmt;
            retScn = currScn;
            if (this._log.isDebugEnabled()) {
                this._log.debug((Object)("Executing MaxScnSkippedForTxnChunked query with currScn :" + currScn + " and txnsPerChunk :" + txnsPerChunk));
            }
            rs = null;
            try {
                stmt.setLong(1, currScn);
                stmt.setLong(2, txnsPerChunk);
                rs = stmt.executeQuery();
                if (!rs.next()) break block6;
                long scnFromQuery = rs.getLong(1);
                if (scnFromQuery == 0L) {
                    if (this._log.isDebugEnabled()) {
                        this._log.debug((Object)("Ignoring SCN obtained from txn chunked query as there may be no update. currScn = " + currScn + " scnFromQuery = " + scnFromQuery));
                    }
                    break block6;
                }
                if (scnFromQuery < currScn) {
                    this._log.error((Object)("ERROR: SCN obtained from txn chunked query is less than currScn. currScn = " + currScn + " scnFromQuery = " + scnFromQuery));
                    break block6;
                }
                retScn = rs.getLong(1);
            }
            catch (Throwable throwable) {
                DBHelper.close(rs);
                throw throwable;
            }
        }
        DBHelper.close((ResultSet)rs);
        return retScn;
    }

    private void generateMaxScnSkippedForTxnChunkedQuery(Connection db) throws SQLException {
        if (null == this._txnChunkJumpScnStmt) {
            StringBuilder sql = new StringBuilder();
            sql.append("SELECT max(t.scn) from (");
            sql.append("select /*+ index(tx) */ tx.scn, row_number() OVER (ORDER BY tx.scn) r FROM ");
            sql.append(this._selectSchema + "sy$txlog tx ");
            sql.append("WHERE tx.scn >  ? AND tx.scn < 9999999999999999999999999999) t ");
            sql.append("WHERE r <= ?");
            this._txnChunkJumpScnStmt = db.prepareStatement(sql.toString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getMaxTxlogSCN(Connection db) throws SQLException {
        this._lastMaxScnTime = System.currentTimeMillis();
        long maxScn = -1L;
        String sql = "select max(scn)from " + this._selectSchema + "sy$txlog where " + "scn < 9999999999999999999999999999";
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)sql);
        }
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        try {
            long testScn;
            pstmt = db.prepareStatement(sql);
            rs = pstmt.executeQuery();
            if (rs.next() && (testScn = rs.getLong(1)) != 0L) {
                maxScn = testScn;
            }
        }
        catch (Throwable throwable) {
            DBHelper.close(rs, (Statement)pstmt, null);
            throw throwable;
        }
        DBHelper.close((ResultSet)rs, (Statement)pstmt, null);
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)("MaxSCN Query :" + sql + ", MaxSCN :" + maxScn));
        }
        return maxScn;
    }

    @Override
    public List<OracleTriggerMonitoredSourceInfo> getSources() {
        return this._sources;
    }

    public void close() {
        if (null != this._eventSelectConnection) {
            DBHelper.close((Connection)this._eventSelectConnection);
        }
    }

    public void setCatchupTargetMaxScn(long catchupTargetMaxScn) {
        this._catchupTargetMaxScn = catchupTargetMaxScn;
    }
}

