/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.relay;

import com.linkedin.databus.core.ConcurrentAppendableCompositeFileInputStream;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusEventInfo;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus.core.TrailFilePositionSetter;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus.core.util.RateControl;
import com.linkedin.databus.monitoring.mbean.EventSourceStatistics;
import com.linkedin.databus.monitoring.mbean.GGParserStatistics;
import com.linkedin.databus.monitoring.mbean.GGParserStatisticsMBean;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.ggParser.XmlStateMachine.ColumnsState;
import com.linkedin.databus2.ggParser.XmlStateMachine.DbUpdateState;
import com.linkedin.databus2.ggParser.XmlStateMachine.TransactionState;
import com.linkedin.databus2.ggParser.XmlStateMachine.TransactionSuccessCallBack;
import com.linkedin.databus2.ggParser.staxparser.StaxBuilder;
import com.linkedin.databus2.ggParser.staxparser.XmlParser;
import com.linkedin.databus2.producers.AbstractEventProducer;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.PartitionFunction;
import com.linkedin.databus2.producers.db.EventReaderSummary;
import com.linkedin.databus2.producers.db.EventSourceStatisticsIface;
import com.linkedin.databus2.producers.db.GGMonitoredSourceInfo;
import com.linkedin.databus2.producers.db.GGXMLTrailTransactionFinder;
import com.linkedin.databus2.producers.db.ReadEventCycleSummary;
import com.linkedin.databus2.producers.gg.DBUpdatesMergeUtils;
import com.linkedin.databus2.producers.gg.GGEventGenerationFactory;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import com.linkedin.databus2.schemas.SchemaId;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import java.lang.management.ManagementFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.xml.stream.XMLStreamException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.commons.lang.NotImplementedException;
import org.apache.log4j.Logger;

public class GoldenGateEventProducer
extends AbstractEventProducer {
    private final PhysicalSourceStaticConfig _pConfig;
    private final SchemaRegistryService _schemaRegistryService;
    private final DbusEventsStatisticsCollector _statsCollector;
    private final AtomicLong _scn = new AtomicLong(-1L);
    private final AtomicLong _startPrevScn = new AtomicLong(-1L);
    State _currentState = State.INIT;
    private WorkerThread _worker;
    private final Lock _pauseLock = new ReentrantLock(true);
    private final Condition _pausedCondition = this._pauseLock.newCondition();
    private final HashMap<Integer, PartitionFunction> _partitionFunctionHashMap;
    private RateControl _rc;
    private final GGParserStatistics _ggParserStats;
    private final Map<Short, GGMonitoredSourceInfo> _monitoredSources = new HashMap<Short, GGMonitoredSourceInfo>();
    public static final short GLOBAL_SOURCE_ID = 0;
    private final List<ObjectName> _registeredMbeans = new ArrayList<ObjectName>();
    private final MBeanServer _mbeanServer = ManagementFactory.getPlatformMBeanServer();
    private final Logger _eventsLog;
    public final Logger _log;

    public GoldenGateEventProducer(PhysicalSourceStaticConfig pConfig, SchemaRegistryService schemaRegistryService, DbusEventBufferAppendable dbusEventBuffer, DbusEventsStatisticsCollector statsCollector, MaxSCNReaderWriter maxScnReaderWriters) throws DatabusException {
        super(dbusEventBuffer, maxScnReaderWriters, pConfig, null);
        this._pConfig = pConfig;
        this._schemaRegistryService = schemaRegistryService;
        this._statsCollector = statsCollector;
        this._currentState = State.INIT;
        this._partitionFunctionHashMap = new HashMap();
        this._eventsLog = Logger.getLogger((String)("com.linkedin.databus2.producers.db.events." + pConfig.getName()));
        if (this._pConfig != null) {
            long eventRatePerSec = pConfig.getEventRatePerSec();
            long maxThrottleDurationInSecs = pConfig.getMaxThrottleDurationInSecs();
            this._rc = eventRatePerSec > 0L && maxThrottleDurationInSecs > 0L ? new RateControl(eventRatePerSec, maxThrottleDurationInSecs) : new RateControl(Long.MIN_VALUE, Long.MIN_VALUE);
        }
        String MODULE = GoldenGateEventProducer.class.getName();
        this._log = Logger.getLogger((String)(MODULE + "." + this.getName()));
        for (int i = 0; i < this._pConfig.getSources().length; ++i) {
            LogicalSourceStaticConfig logicalSourceStaticConfig = this._pConfig.getSources()[i];
            GGMonitoredSourceInfo source = this.buildGGMonitoredSourceInfo(logicalSourceStaticConfig, this._pConfig);
            this._monitoredSources.put(source.getSourceId(), source);
        }
        LogicalSourceStaticConfig logicalSourceStaticConfig = new LogicalSourceStaticConfig(0, this._pConfig.getName(), "", "constant:1", 0, false, null, null, null);
        GGMonitoredSourceInfo source = this.buildGGMonitoredSourceInfo(logicalSourceStaticConfig, this._pConfig);
        this._monitoredSources.put(source.getSourceId(), source);
        this._ggParserStats = new GGParserStatistics(this._pConfig.getName());
        this.registerParserMbean((GGParserStatisticsMBean)this._ggParserStats);
    }

    public GGMonitoredSourceInfo buildGGMonitoredSourceInfo(LogicalSourceStaticConfig sourceConfig, PhysicalSourceStaticConfig pConfig) throws DatabusException, InvalidConfigException {
        PartitionFunction partitionFunction = GGEventGenerationFactory.buildPartitionFunction(sourceConfig);
        this._partitionFunctionHashMap.put(Integer.valueOf(sourceConfig.getId()), partitionFunction);
        EventSourceStatistics statisticsBean = new EventSourceStatistics(sourceConfig.getName());
        GGMonitoredSourceInfo sourceInfo = new GGMonitoredSourceInfo(sourceConfig.getId(), sourceConfig.getName(), statisticsBean);
        this.registerMbeans(sourceInfo);
        return sourceInfo;
    }

    private void registerParserMbean(GGParserStatisticsMBean parserBean) throws DatabusException {
        try {
            Hashtable<String, String> props = new Hashtable<String, String>();
            props.put("type", "GGParserStatistics");
            props.put("name", this._pConfig.getName());
            ObjectName objectName = new ObjectName("com.linkedin.databus2", props);
            if (this._mbeanServer.isRegistered(objectName)) {
                this._log.warn((Object)("Unregistering old ggparser statistics mbean: " + objectName));
                this._mbeanServer.unregisterMBean(objectName);
            }
            this._mbeanServer.registerMBean(parserBean, objectName);
            this._log.info((Object)("Registered gg-parser statistics mbean: " + objectName));
            this._registeredMbeans.add(objectName);
        }
        catch (Exception ex) {
            this._log.error((Object)("Failed to register the GGparser statistics mbean for db = " + this._pConfig.getName() + " due to an exception."), (Throwable)ex);
            throw new DatabusException("Failed to initialize GGparser statistics mbean.", (Throwable)ex);
        }
    }

    private void registerMbeans(GGMonitoredSourceInfo source) throws DatabusException {
        try {
            Hashtable<String, String> props = new Hashtable<String, String>();
            props.put("type", "SourceStatistics");
            props.put("name", source.getSourceName());
            ObjectName objectName = new ObjectName("com.linkedin.databus2", props);
            if (this._mbeanServer.isRegistered(objectName)) {
                this._log.warn((Object)("Unregistering old gg-source statistics mbean: " + objectName));
                this._mbeanServer.unregisterMBean(objectName);
            }
            this._mbeanServer.registerMBean(source.getStatisticsBean(), objectName);
            this._log.info((Object)("Registered gg-source statistics mbean: " + objectName));
            this._registeredMbeans.add(objectName);
        }
        catch (Exception ex) {
            this._log.error((Object)("Failed to register the gg-source statistics mbean for source (" + source.getSourceName() + ") due to an exception."), (Throwable)ex);
            throw new DatabusException("Failed to initialize gg event statistics mbeans.", (Throwable)ex);
        }
    }

    public GGParserStatistics getParserStats() {
        return this._ggParserStats;
    }

    public String getName() {
        return this._pConfig != null ? this._pConfig.getName() : "NONE";
    }

    public long getSCN() {
        return this._scn.get();
    }

    public synchronized void start(long sinceSCN) {
        this._log.info((Object)"Start golden gate evert producer requested.");
        if (this._currentState == State.RUNNING) {
            this._log.error((Object)"Thread already running! ");
            return;
        }
        this._scn.set(-1L);
        if (sinceSCN > 0L) {
            this._scn.set(sinceSCN);
        } else if (this.getMaxScnReaderWriter() != null) {
            try {
                long scn = this.getMaxScnReaderWriter().getMaxScn();
                if (scn > 0L) {
                    long newScn = scn >= this._pConfig.getRestartScnOffset() ? scn - this._pConfig.getRestartScnOffset() : 0L;
                    this._log.info((Object)("Checkpoint read = " + scn + " restartScnOffset= " + this._pConfig.getRestartScnOffset() + " Adjusted SCN= " + newScn));
                    if (newScn > 0L) {
                        this._scn.set(newScn);
                    }
                } else {
                    this._log.info((Object)("Overridding default behaviour (start with latest scn), using scn : " + scn + " to start the relay"));
                    if (scn != -2L && scn != -1L) {
                        throw new DatabusException("The scn you have passed is neither EARLIEST or LATEST  setting, cannot proceed with using this scn");
                    }
                    this._scn.set(scn);
                }
            }
            catch (DatabusException e) {
                this._log.warn((Object)("Could not read saved maxScn: Defaulting to startSCN=" + this._scn.get()));
            }
        }
        if (this._worker == null) {
            this._log.info((Object)("Starting with scn = " + this._scn.get()));
            this._worker = new WorkerThread();
            this._worker.setDaemon(true);
            this._worker.start();
        }
    }

    public boolean isRunning() {
        return this._currentState == State.RUNNING;
    }

    public boolean isPaused() {
        return this._currentState == State.PAUSED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unpause() {
        this._log.info((Object)"Golden gate evert producer unpause requested.");
        this._pauseLock.lock();
        try {
            this._pauseRequested = false;
            this._pausedCondition.signalAll();
        }
        catch (Exception e) {
            this._log.error((Object)("Error while unpausing the golden gate event producer: " + e));
        }
        finally {
            this._pauseLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pause() {
        this._log.info((Object)"Golden gate evert producer pause requested.");
        this._pauseLock.lock();
        try {
            this._pauseRequested = true;
        }
        catch (Exception e) {
            this._log.error((Object)("Error while unpausing the golden gate event producer: " + e));
        }
        finally {
            this._pauseLock.unlock();
        }
    }

    private synchronized boolean isPauseRequested() {
        return this._pauseRequested;
    }

    public synchronized void shutdown() {
        this._log.info((Object)"Golden gate evert producer shutdown requested.");
        this._shutdownRequested = true;
        for (ObjectName name : this._registeredMbeans) {
            try {
                this._mbeanServer.unregisterMBean(name);
                this._log.info((Object)("Unregistered gg-source mbean: " + name));
            }
            catch (MBeanRegistrationException e) {
                this._log.warn((Object)("Exception when unregistering gg-source statistics mbean: " + name + e));
            }
            catch (InstanceNotFoundException e) {
                this._log.warn((Object)("Exception when unregistering gg-source statistics mbean: " + name + e));
            }
        }
        if (this._worker != null) {
            if (this._worker._parser == null) {
                this._log.error((Object)"The parser is null, unable to shutdown the event producer");
                return;
            }
            this._worker._parser.setShutDownRequested(true);
            this._worker.interrupt();
        }
        this._log.warn((Object)"Shut down request sent to thread");
    }

    public synchronized void waitForShutdown() throws InterruptedException, IllegalStateException {
        if (this._currentState != State.SHUTDOWN && this._worker != null) {
            this._worker.join();
        }
    }

    public synchronized void waitForShutdown(long timeout) throws InterruptedException, IllegalStateException {
        if (this._currentState != State.SHUTDOWN && this._worker != null) {
            this._worker.join(timeout);
        }
    }

    protected ReadEventCycleSummary readEventsFromAllSources(long sinceSCN) throws DatabusException, EventCreationException, UnsupportedKeyException {
        throw new NotImplementedException("Not implemented");
    }

    private InputStream wrapStreamWithXmlTags(InputStream compositeInputStream) {
        String xmlVersion = this._pConfig.getXmlVersion();
        String xmlEncoding = this._pConfig.getXmlEncoding();
        String xmlStart = "<?xml version=\"" + xmlVersion + "\" encoding=\"" + xmlEncoding + "\"?>\n<root>";
        String xmlEnd = "</root>";
        this._log.info((Object)("The xml start tag used is:" + xmlStart));
        List<InputStream> xmlTagsList = Arrays.asList(new ByteArrayInputStream(xmlStart.getBytes(Charset.forName(xmlEncoding))), compositeInputStream, new ByteArrayInputStream(xmlEnd.getBytes(Charset.forName(xmlEncoding))));
        Enumeration<InputStream> streams = Collections.enumeration(xmlTagsList);
        SequenceInputStream seqStream = new SequenceInputStream(streams);
        return seqStream;
    }

    private ConcurrentAppendableCompositeFileInputStream locateScnInTrailFile(String xmlDir, String xmlPrefix) throws Exception {
        ConcurrentAppendableCompositeFileInputStream compositeInputStream = null;
        TrailFilePositionSetter.FilePositionResult filePositionResult = null;
        TrailFilePositionSetter trailFilePositionSetter = null;
        block6: while (compositeInputStream == null) {
            this._log.info((Object)("Requesting trail file position setter for scn: " + this._scn.get()));
            trailFilePositionSetter = new TrailFilePositionSetter(xmlDir, xmlPrefix, this.getName());
            filePositionResult = trailFilePositionSetter.locateFilePosition(this._scn.get(), new GGXMLTrailTransactionFinder());
            this._log.info((Object)("File position at : " + filePositionResult));
            switch (filePositionResult.getStatus()) {
                case ERROR: {
                    this._log.fatal((Object)"Unable to locate the scn in the trail file.");
                    throw new DatabusException("Unable to find the given scn " + this._scn.get() + " in the trail files");
                }
                case NO_TXNS_FOUND: {
                    if (this._scn.get() == -1L) {
                        this._log.info((Object)"Switching from USE_LATEST_SCN to USE_EARLIEST_SCN because no trail files were not found");
                        this._scn.set(-2L);
                    }
                    long noTxnsFoundSleepTime = 500L;
                    this._log.info((Object)("NO_TXNS_FOUND, sleeping for " + noTxnsFoundSleepTime + " ms before retrying"));
                    Thread.sleep(noTxnsFoundSleepTime);
                    continue block6;
                }
                case EXACT_SCN_NOT_FOUND: {
                    this._log.info((Object)("Exact SCN was not found, the closest scn found was: " + filePositionResult.getTxnPos().getMinScn()));
                    compositeInputStream = new ConcurrentAppendableCompositeFileInputStream(xmlDir, filePositionResult.getTxnPos().getFile(), filePositionResult.getTxnPos().getFileOffset(), new TrailFilePositionSetter.FileFilter(new File(xmlDir), xmlPrefix), false);
                    long foundScn = filePositionResult.getTxnPos().getMaxScn();
                    if (foundScn <= this._scn.get()) {
                        throw new DatabusException("EXACT_SCN_NOT_FOUND, but foundScn is <= _scn ");
                    }
                    this._startPrevScn.set(this._scn.get());
                    this._log.info((Object)("Changing current scn from " + this._scn.get() + " to " + foundScn));
                    this._log.info((Object)("Planning to use prevScn " + this._startPrevScn));
                    this._scn.set(foundScn);
                    continue block6;
                }
                case FOUND: {
                    this._log.info((Object)("Exact SCN was  found" + filePositionResult.getTxnPos().getMaxScn()));
                    compositeInputStream = new ConcurrentAppendableCompositeFileInputStream(xmlDir, filePositionResult.getTxnPos().getFile(), filePositionResult.getTxnPos().getFileOffset(), new TrailFilePositionSetter.FileFilter(new File(xmlDir), xmlPrefix), false);
                    long foundScn = filePositionResult.getTxnPos().getMaxScn();
                    if (this._scn.get() >= 0L && this._scn.get() != foundScn) {
                        throw new DatabusException("The exact scn was not found, but the trail file position setter has returned FOUND!");
                    }
                    this._startPrevScn.set(foundScn - 1L);
                    this._scn.set(foundScn);
                    continue block6;
                }
            }
            throw new DatabusException("Unhandled file position result in switch case, terminating producer.");
        }
        if (filePositionResult == null) {
            this._log.info(trailFilePositionSetter);
            throw new DatabusException("file position Result returned by TrailFilePositionSetter is null!");
        }
        if (this._scn.get() <= 0L) {
            this._log.info((Object)("The scn is <=0, using scn from file position setter:" + filePositionResult));
            this._scn.set(filePositionResult.getTxnPos().getMaxScn());
        }
        return compositeInputStream;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected static Object obtainKey(DbUpdateState.DBUpdateImage dbUpdate) throws DatabusException {
        if (null == dbUpdate) {
            throw new DatabusException("DBUpdateImage is null");
        }
        ArrayList<ColumnsState.KeyPair> pairs = dbUpdate.getKeyPairs();
        if (null == pairs) throw new DatabusException("There do not seem to be any keys");
        if (pairs.size() == 0) {
            throw new DatabusException("There do not seem to be any keys");
        }
        if (pairs.size() == 1) {
            Object key = dbUpdate.getKeyPairs().get(0).getKey();
            Schema.Type pKeyType = dbUpdate.getKeyPairs().get(0).getKeyType();
            Object keyObj = null;
            if (pKeyType == Schema.Type.INT) {
                if (!(key instanceof Integer)) throw new DatabusException("Schema.Type does not match actual key type (INT) " + key.getClass().getName());
                return key;
            }
            if (pKeyType != Schema.Type.LONG) return key;
            if (!(key instanceof Long)) throw new DatabusException("Schema.Type does not match actual key type (LONG) " + key.getClass().getName());
            keyObj = key;
            return key;
        }
        Iterator li = pairs.iterator();
        String compositeKey = "";
        while (li.hasNext()) {
            ColumnsState.KeyPair kp = (ColumnsState.KeyPair)li.next();
            Schema.Type pKeyType = kp.getKeyType();
            Object key = kp.getKey();
            if (pKeyType == Schema.Type.INT) {
                if (!(key instanceof Integer)) throw new DatabusException("Schema.Type does not match actual key type (INT) " + key.getClass().getName());
                compositeKey = compositeKey + kp.getKey().toString();
            } else if (pKeyType == Schema.Type.LONG) {
                if (!(key instanceof Long)) throw new DatabusException("Schema.Type does not match actual key type (LONG) " + key.getClass().getName());
                compositeKey = compositeKey + key.toString();
            } else {
                compositeKey = compositeKey + key;
            }
            if (!li.hasNext()) continue;
            compositeKey = compositeKey + "\t";
        }
        return compositeKey;
    }

    protected void addEventToBuffer(List<TransactionState.PerSourceTransactionalUpdate> dbUpdates, GGParserStatistics.TransactionInfo ti) throws DatabusException, UnsupportedKeyException {
        if (dbUpdates.size() == 0) {
            throw new DatabusException("Cannot handle empty dbUpdates");
        }
        long scn = ti.getScn();
        long timestamp = ti.getTransactionTimeStampNs();
        EventSourceStatistics globalStats = this.getSource((short)0).getStatisticsBean();
        if (scn == this._startPrevScn.get()) {
            this._log.info((Object)"Skipping this transaction, EOP already send for this event");
            return;
        }
        this.getEventBuffer().startEvents();
        int eventsInTransactionCount = 0;
        ArrayList<EventReaderSummary> summaries = new ArrayList<EventReaderSummary>();
        for (int i = 0; i < dbUpdates.size(); ++i) {
            GenericRecord record = null;
            TransactionState.PerSourceTransactionalUpdate perSourceUpdate = dbUpdates.get(i);
            short sourceId = (short)perSourceUpdate.getSourceId();
            EventSourceStatistics perSourceStats = this.getSource(sourceId).getStatisticsBean();
            Iterator<DbUpdateState.DBUpdateImage> dbUpdateIterator = perSourceUpdate.getDbUpdatesSet().iterator();
            int eventsInDbUpdate = 0;
            long dbUpdatesEventsSize = 0L;
            long startDbUpdatesMs = System.currentTimeMillis();
            while (dbUpdateIterator.hasNext()) {
                DbUpdateState.DBUpdateImage dbUpdate = dbUpdateIterator.next();
                Object keyObj = GoldenGateEventProducer.obtainKey(dbUpdate);
                DbusEventKey eventKey = new DbusEventKey(keyObj);
                PartitionFunction partitionFunction = this._partitionFunctionHashMap.get(sourceId);
                short lPartitionId = partitionFunction.getPartition(eventKey);
                record = dbUpdate.getGenericRecord();
                if (record == null) {
                    throw new DatabusException("Cannot write event to buffer because record = " + record);
                }
                if (record.getSchema() == null) {
                    throw new DatabusException("The record does not have a schema (null schema)");
                }
                try {
                    DbusOpcode opCode;
                    ++eventsInDbUpdate;
                    ++eventsInTransactionCount;
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    BinaryEncoder encoder = new BinaryEncoder((OutputStream)bos);
                    GenericDatumWriter writer = new GenericDatumWriter(record.getSchema());
                    writer.write((Object)record, (Encoder)encoder);
                    byte[] serializedValue = bos.toByteArray();
                    SchemaId schemaId = SchemaId.createWithMd5((Schema)dbUpdate.getSchema());
                    if (dbUpdate.getOpType() == DbUpdateState.DBUpdateImage.OpType.INSERT || dbUpdate.getOpType() == DbUpdateState.DBUpdateImage.OpType.UPDATE) {
                        opCode = DbusOpcode.UPSERT;
                        if (this._log.isDebugEnabled()) {
                            this._log.debug((Object)("The event with scn " + scn + " is INSERT/UPDATE"));
                        }
                    } else if (dbUpdate.getOpType() == DbUpdateState.DBUpdateImage.OpType.DELETE) {
                        opCode = DbusOpcode.DELETE;
                        if (this._log.isDebugEnabled()) {
                            this._log.debug((Object)("The event with scn " + scn + " is DELETE"));
                        }
                    } else {
                        throw new DatabusException("Unknown opcode from dbUpdate for event with scn:" + scn);
                    }
                    DbusEventInfo dbusEventInfo = new DbusEventInfo(opCode, scn, (short)this._pConfig.getId(), lPartitionId, timestamp, sourceId, schemaId.getByteArray(), serializedValue, false, false);
                    dbusEventInfo.setReplicated(dbUpdate.isReplicated());
                    perSourceStats.addEventCycle(1, ti.getTransactionTimeRead(), (long)serializedValue.length, scn);
                    globalStats.addEventCycle(1, ti.getTransactionTimeRead(), (long)serializedValue.length, scn);
                    long tsEnd = System.currentTimeMillis();
                    perSourceStats.addTimeOfLastDBAccess(tsEnd);
                    globalStats.addTimeOfLastDBAccess(tsEnd);
                    this.getEventBuffer().appendEvent(eventKey, dbusEventInfo, this._statsCollector);
                    this._rc.incrementEventCount();
                    dbUpdatesEventsSize += (long)serializedValue.length;
                }
                catch (IOException io) {
                    perSourceStats.addError();
                    globalStats.addEmptyEventCycle();
                    this._log.error((Object)("Cannot create byte stream payload: " + dbUpdates.get(i).getSourceId()));
                }
            }
            long endDbUpdatesMs = System.currentTimeMillis();
            long dbUpdatesElapsedTimeMs = endDbUpdatesMs - startDbUpdatesMs;
            EventReaderSummary summary = new EventReaderSummary(sourceId, this._monitoredSources.get(sourceId).getSourceName(), scn, eventsInDbUpdate, dbUpdatesEventsSize, -1L, dbUpdatesElapsedTimeMs, timestamp, timestamp, -1L);
            if (this._eventsLog.isInfoEnabled()) {
                this._eventsLog.info((Object)summary.toString());
            }
            summaries.add(summary);
            if (!this._log.isDebugEnabled()) continue;
            this._log.debug((Object)("There are " + eventsInDbUpdate + " events seen in the current dbUpdate"));
        }
        ReadEventCycleSummary summary = new ReadEventCycleSummary(this._pConfig.getName(), summaries, scn, -1L);
        if (this._eventsLog.isInfoEnabled()) {
            this._eventsLog.info((Object)summary.toString());
        }
        this._log.info((Object)("Writing " + eventsInTransactionCount + " events from transaction with scn: " + scn));
        if (scn <= 0L) {
            throw new DatabusException("Unable to write events to buffer because of negative/zero scn: " + scn);
        }
        this.getEventBuffer().endEvents(scn, this._statsCollector);
        this._scn.set(scn);
        if (this.getMaxScnReaderWriter() != null) {
            try {
                this.getMaxScnReaderWriter().saveMaxScn(this._scn.get());
            }
            catch (DatabusException e) {
                this._log.error((Object)("Cannot save scn = " + this._scn + " for physical source = " + this.getName()), (Throwable)e);
            }
        }
    }

    protected RateControl getRateControl() {
        return this._rc;
    }

    public List<? extends EventSourceStatisticsIface> getSources() {
        return new ArrayList<GGMonitoredSourceInfo>(this._monitoredSources.values());
    }

    public GGMonitoredSourceInfo getSource(short sourceId) {
        return this._monitoredSources.get(sourceId);
    }

    private static class MergeDbResult {
        private final boolean _doAppendToBuffer;
        private final List<TransactionState.PerSourceTransactionalUpdate> _mergedDbUpdates;
        private final GGParserStatistics.TransactionInfo _mergedTxnInfo;
        private final GGParserStatistics.TransactionInfo _lastParsedTxnInfo;
        private final int _numEventsInLastParsedTxn;

        protected static MergeDbResult createDoNotAppendResult(GGParserStatistics.TransactionInfo lastParsedTxnInfo, int numEventsInLastParsedTxn) {
            return new MergeDbResult(false, null, null, lastParsedTxnInfo, numEventsInLastParsedTxn);
        }

        protected static MergeDbResult createAppendResult(List<TransactionState.PerSourceTransactionalUpdate> mergedDbUpdates, GGParserStatistics.TransactionInfo mergedTxnInfo, GGParserStatistics.TransactionInfo lastParsedTxnInfo, int numEventsInLastParsedTxn) {
            return new MergeDbResult(true, mergedDbUpdates, mergedTxnInfo, lastParsedTxnInfo, numEventsInLastParsedTxn);
        }

        private MergeDbResult(boolean doAppendToBuffer, List<TransactionState.PerSourceTransactionalUpdate> mergedDbUpdates, GGParserStatistics.TransactionInfo mergedTxnInfo, GGParserStatistics.TransactionInfo lastParsedTxnInfo, int numEventsInLastParsedTxn) {
            this._doAppendToBuffer = doAppendToBuffer;
            this._mergedDbUpdates = mergedDbUpdates;
            this._mergedTxnInfo = mergedTxnInfo;
            this._lastParsedTxnInfo = lastParsedTxnInfo;
            this._numEventsInLastParsedTxn = numEventsInLastParsedTxn;
        }

        public boolean isDoAppendToBuffer() {
            return this._doAppendToBuffer;
        }

        public List<TransactionState.PerSourceTransactionalUpdate> getMergedDbUpdates() {
            return this._mergedDbUpdates;
        }

        public GGParserStatistics.TransactionInfo getMergedTxnInfo() {
            return this._mergedTxnInfo;
        }

        public GGParserStatistics.TransactionInfo getLastParsedTxnInfo() {
            return this._lastParsedTxnInfo;
        }

        public int getNumEventsInLastParsedTxn() {
            return this._numEventsInLastParsedTxn;
        }

        public String toString() {
            return "MergeDbResult [doAppendToBuffer=" + this._doAppendToBuffer + ", mergedDbUpdates=" + this._mergedDbUpdates + ", mergedTxnInfo=" + this._mergedTxnInfo + ", lastParsedTxnInfo=" + this._lastParsedTxnInfo + ", numEventsInLastParsedTxn=" + this._numEventsInLastParsedTxn + "]";
        }
    }

    private class WorkerThread
    extends Thread {
        private HandleXmlCallback _xmlCallback;
        private XmlParser _parser;
        private int nullTransactions = 0;

        private WorkerThread() {
        }

        private void checkAndInsertEOP(long scn) {
            GoldenGateEventProducer.this._scn.set(scn);
            ++this.nullTransactions;
            if (this.nullTransactions >= 100) {
                GoldenGateEventProducer.this._log.info((Object)("Inserting EOP in the buffer after " + this.nullTransactions + " empty transactions at scn = " + scn));
                GoldenGateEventProducer.this.getEventBuffer().startEvents();
                GoldenGateEventProducer.this.getEventBuffer().endEvents(scn, GoldenGateEventProducer.this._statsCollector);
                this.nullTransactions = 0;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean isReadyToRun() {
            if (GoldenGateEventProducer.this._shutdownRequested) {
                GoldenGateEventProducer.this._log.info((Object)"The parser is already shutdown");
                GoldenGateEventProducer.this._currentState = State.SHUTDOWN;
                return false;
            }
            GoldenGateEventProducer.this._pauseLock.lock();
            try {
                if (GoldenGateEventProducer.this.isPauseRequested() && GoldenGateEventProducer.this._currentState != State.PAUSED) {
                    GoldenGateEventProducer.this._currentState = State.PAUSED;
                    GoldenGateEventProducer.this._log.warn((Object)"Pausing event generator");
                    while (GoldenGateEventProducer.this._currentState == State.PAUSED && !GoldenGateEventProducer.this._shutdownRequested && GoldenGateEventProducer.this.isPauseRequested()) {
                        try {
                            GoldenGateEventProducer.this._pausedCondition.await();
                        }
                        catch (InterruptedException e) {
                            GoldenGateEventProducer.this._log.warn((Object)("Paused thread interrupted! Shutdown requested=" + GoldenGateEventProducer.this._shutdownRequested));
                        }
                    }
                }
            }
            finally {
                GoldenGateEventProducer.this._pauseLock.unlock();
            }
            if (!GoldenGateEventProducer.this._shutdownRequested) {
                GoldenGateEventProducer.this._currentState = State.RUNNING;
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ConcurrentAppendableCompositeFileInputStream compositeInputStream = null;
            try {
                if (this._xmlCallback == null) {
                    this._xmlCallback = new HandleXmlCallback();
                }
                String xmlDir = GGEventGenerationFactory.uriToGGDir(GoldenGateEventProducer.this._pConfig.getUri());
                String xmlPrefix = GGEventGenerationFactory.uriToXmlPrefix(GoldenGateEventProducer.this._pConfig.getUri());
                File file = new File(xmlDir);
                if (!file.exists() || !file.isDirectory()) {
                    GoldenGateEventProducer.this._log.fatal((Object)("Unable to load the directory: " + xmlDir + " it doesn't seem to be a valid directory"));
                    throw new DatabusException("Invalid trail file directory");
                }
                boolean parseError = false;
                do {
                    try {
                        GoldenGateEventProducer.this._log.info((Object)("Using xml directory : " + xmlDir + " and using the xml Prefix : " + xmlPrefix));
                        compositeInputStream = GoldenGateEventProducer.this.locateScnInTrailFile(xmlDir, xmlPrefix);
                        compositeInputStream.setGGParserStats(GoldenGateEventProducer.this._ggParserStats);
                        GoldenGateEventProducer.this._log.info((Object)"Attempting to start the parser...");
                        if (!parseError) {
                            GoldenGateEventProducer.this._log.info((Object)("Starting dbusEventBuffer with _scn : " + GoldenGateEventProducer.this._startPrevScn.get()));
                            GoldenGateEventProducer.this.getEventBuffer().start(GoldenGateEventProducer.this._startPrevScn.get());
                        } else {
                            GoldenGateEventProducer.this._log.warn((Object)("Umm, looks like the parser had failed, this is an retry attempt using _scn: " + GoldenGateEventProducer.this._scn.get()));
                            GoldenGateEventProducer.this._log.info((Object)("CompositeInputStream used:" + compositeInputStream));
                        }
                        StaxBuilder builder = new StaxBuilder(GoldenGateEventProducer.this._schemaRegistryService, GoldenGateEventProducer.this.wrapStreamWithXmlTags(compositeInputStream), GoldenGateEventProducer.this._pConfig, this._xmlCallback);
                        if (GoldenGateEventProducer.this._log.isDebugEnabled()) {
                            GoldenGateEventProducer.this._log.debug((Object)("CompositeInputStream used:" + compositeInputStream));
                        }
                        this._parser = builder.getParser();
                        builder.processXml();
                        parseError = false;
                    }
                    catch (XMLStreamException e) {
                        GoldenGateEventProducer.this._ggParserStats.addParsingError();
                        if (GoldenGateEventProducer.this._shutdownRequested) {
                            parseError = false;
                            continue;
                        }
                        GoldenGateEventProducer.this._log.error((Object)"Error while parsing the xml, will retry loading the parser", (Throwable)e);
                        GoldenGateEventProducer.this._log.info((Object)("Last scn seen before the crash: " + GoldenGateEventProducer.this._scn.get()));
                        GoldenGateEventProducer.this._log.info((Object)("CompositeInputStream used:" + compositeInputStream));
                        parseError = true;
                    }
                    finally {
                        if (compositeInputStream != null) {
                            compositeInputStream.close();
                        }
                    }
                } while (parseError);
            }
            catch (RuntimeException e) {
                GoldenGateEventProducer.this._log.info((Object)("CompositeInputStream used:" + compositeInputStream));
                GoldenGateEventProducer.this._log.error((Object)"Error while parsing data, compositeInputStream shutting down the relay", (Throwable)e);
                GoldenGateEventProducer.this._currentState = State.SHUTDOWN;
                throw e;
            }
            catch (Exception e) {
                GoldenGateEventProducer.this._log.info((Object)("CompositeInputStream used:" + compositeInputStream));
                GoldenGateEventProducer.this._log.error((Object)"Error while parsing data, compositeInputStream shutting down the relay", (Throwable)e);
                GoldenGateEventProducer.this._currentState = State.SHUTDOWN;
                return;
            }
        }

        private class HandleXmlCallback
        implements TransactionSuccessCallBack {
            private List<TransactionState.PerSourceTransactionalUpdate> _pendingDbUpdatesBuffer = null;
            private GGParserStatistics.TransactionInfo _pendingTxnInfo = null;
            private long _lastSeenScn = -1L;

            private HandleXmlCallback() {
            }

            private MergeDbResult mergeTransactions(List<TransactionState.PerSourceTransactionalUpdate> dbUpdates, GGParserStatistics.TransactionInfo txnInfo) {
                MergeDbResult result = null;
                if (this._pendingTxnInfo == null) {
                    this._pendingTxnInfo = txnInfo;
                    this._pendingDbUpdatesBuffer = null != dbUpdates ? new ArrayList<TransactionState.PerSourceTransactionalUpdate>(dbUpdates) : new ArrayList<TransactionState.PerSourceTransactionalUpdate>();
                    result = MergeDbResult.createDoNotAppendResult(txnInfo, this.getNumEventsInTxn(dbUpdates));
                } else if (txnInfo.getScn() == this._pendingTxnInfo.getScn()) {
                    this._pendingDbUpdatesBuffer = DBUpdatesMergeUtils.mergeTransactionData(dbUpdates, this._pendingDbUpdatesBuffer);
                    this._pendingTxnInfo = new GGParserStatistics.TransactionInfo(this._pendingTxnInfo.getTransactionSize() + txnInfo.getTransactionSize(), this._pendingTxnInfo.getTransactionTimeRead() + txnInfo.getTransactionTimeRead(), txnInfo.getTransactionTimeStampNs(), txnInfo.getScn());
                    result = MergeDbResult.createDoNotAppendResult(txnInfo, this.getNumEventsInTxn(dbUpdates));
                } else if (txnInfo.getScn() > this._pendingTxnInfo.getScn()) {
                    result = MergeDbResult.createAppendResult(this._pendingDbUpdatesBuffer, this._pendingTxnInfo, txnInfo, this.getNumEventsInTxn(dbUpdates));
                    this._pendingDbUpdatesBuffer = null != dbUpdates ? new ArrayList<TransactionState.PerSourceTransactionalUpdate>(dbUpdates) : new ArrayList<TransactionState.PerSourceTransactionalUpdate>();
                    this._pendingTxnInfo = txnInfo;
                } else {
                    GoldenGateEventProducer.this._log.error((Object)("Last Read Transaction's SCN is lower than that of previously read. Skipping this Transaction. Last Read SCN :" + txnInfo.getScn() + " Previously Read SCN : " + this._pendingTxnInfo.getScn()));
                    result = MergeDbResult.createDoNotAppendResult(txnInfo, 0);
                    if (this._lastSeenScn > txnInfo.getScn()) {
                        GoldenGateEventProducer.this._ggParserStats.addScnRegression(txnInfo.getScn());
                    }
                }
                this._lastSeenScn = txnInfo.getScn();
                return result;
            }

            private int getNumEventsInTxn(List<TransactionState.PerSourceTransactionalUpdate> dbUpdates) {
                if (null == dbUpdates) {
                    return 0;
                }
                int numEvents = 0;
                for (TransactionState.PerSourceTransactionalUpdate d : dbUpdates) {
                    numEvents += d.getNumDbUpdates();
                }
                return numEvents;
            }

            @Override
            public void onTransactionEnd(List<TransactionState.PerSourceTransactionalUpdate> newDbUpdates, GGParserStatistics.TransactionInfo newTxnInfo) throws DatabusException, UnsupportedKeyException {
                long scn = newTxnInfo.getScn();
                if (newDbUpdates == null) {
                    GoldenGateEventProducer.this._log.info((Object)("Received empty transaction callback with no DbUpdates with scn " + scn));
                }
                if (!WorkerThread.this.isReadyToRun()) {
                    return;
                }
                MergeDbResult result = this.mergeTransactions(newDbUpdates, newTxnInfo);
                List<TransactionState.PerSourceTransactionalUpdate> dbUpdates = result.getMergedDbUpdates();
                GGParserStatistics.TransactionInfo txnInfo = result.getMergedTxnInfo();
                if (!result.isDoAppendToBuffer()) {
                    GoldenGateEventProducer.this._ggParserStats.addTransactionInfo(result.getLastParsedTxnInfo(), result.getNumEventsInLastParsedTxn());
                    return;
                }
                scn = txnInfo.getScn();
                try {
                    if (dbUpdates == null || dbUpdates.isEmpty()) {
                        WorkerThread.this.checkAndInsertEOP(scn);
                    } else {
                        GoldenGateEventProducer.this.addEventToBuffer(dbUpdates, txnInfo);
                    }
                    GoldenGateEventProducer.this._ggParserStats.addTransactionInfo(result.getLastParsedTxnInfo(), result.getNumEventsInLastParsedTxn());
                }
                catch (DatabusException e) {
                    GoldenGateEventProducer.this._ggParserStats.addError();
                    GoldenGateEventProducer.this._log.error((Object)("Error while adding events to buffer: " + (Object)((Object)e)));
                    throw e;
                }
                catch (UnsupportedKeyException e) {
                    GoldenGateEventProducer.this._ggParserStats.addError();
                    GoldenGateEventProducer.this._log.error((Object)("Error while adding events to buffer: " + (Object)((Object)e)));
                    throw e;
                }
            }
        }
    }

    private static enum State {
        INIT,
        PAUSED,
        RUNNING,
        SHUTDOWN;

    }
}

