/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.producer.txnprocessor.impl;

import com.flipkart.aesop.runtime.producer.MysqlEventProducer;
import com.flipkart.aesop.runtime.producer.avro.MysqlAvroEventManager;
import com.flipkart.aesop.runtime.producer.mapper.BinLogEventMapper;
import com.flipkart.aesop.runtime.producer.txnprocessor.MysqlTransactionManager;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.common.glossary.Row;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.ds.DbChangeEntry;
import com.linkedin.databus2.producers.ds.PerSourceTransaction;
import com.linkedin.databus2.producers.ds.Transaction;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import com.linkedin.databus2.schemas.VersionedSchema;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class MysqlTransactionManagerImpl
implements MysqlTransactionManager {
    private static final Logger LOGGER = LogFactory.getLogger(MysqlTransactionManagerImpl.class);
    private final DbusEventBufferAppendable eventBuffer;
    private final MaxSCNReaderWriter maxSCNReaderWriter;
    private final DbusEventsStatisticsCollector dbusEventsStatisticsCollector;
    private final Map<Integer, MysqlAvroEventManager> eventManagerMap;
    private final AtomicLong sinceSCN;
    private final Map<String, Short> tableUriToSrcIdMap;
    private final Map<String, String> tableUriToSrcNameMap;
    private String currTableName = "";
    private Transaction transaction = null;
    private long currTableId = -1L;
    private PerSourceTransaction perSourceTransaction = null;
    private boolean beginTxnSeen = false;
    private long currTxnSizeInBytes = 0L;
    private long currTxnTimestamp = 0L;
    private long currTxnStartReadTimestamp = 0L;
    private int currFileNum;
    private Map<Integer, BinLogEventMapper> binLogEventMappers;
    private SchemaRegistryService schemaRegistryService;
    private Map<Long, String> mysqlTableIdToTableNameMap;
    private MysqlEventProducer mySqlEventProducer;
    private volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false);

    public MysqlTransactionManagerImpl(DbusEventBufferAppendable eventBuffer, MaxSCNReaderWriter maxSCNReaderWriter, DbusEventsStatisticsCollector dbusEventsStatisticsCollector, Map<Integer, MysqlAvroEventManager> eventManagerMap, int currFileNum, Map<String, Short> tableUriToSrcIdMap, Map<String, String> tableUriToSrcNameMap, SchemaRegistryService schemaRegistryService, AtomicLong sinceSCN, Map<Integer, BinLogEventMapper> binLogEventMappers, MysqlEventProducer mySqlEventProducer) {
        this.eventBuffer = eventBuffer;
        this.maxSCNReaderWriter = maxSCNReaderWriter;
        this.dbusEventsStatisticsCollector = dbusEventsStatisticsCollector;
        this.eventManagerMap = eventManagerMap;
        this.currFileNum = currFileNum;
        this.tableUriToSrcIdMap = tableUriToSrcIdMap;
        this.tableUriToSrcNameMap = tableUriToSrcNameMap;
        this.schemaRegistryService = schemaRegistryService;
        this.sinceSCN = sinceSCN;
        this.binLogEventMappers = binLogEventMappers;
        this.mysqlTableIdToTableNameMap = new HashMap<Long, String>();
        this.mySqlEventProducer = mySqlEventProducer;
    }

    @Override
    public void startXtion() {
        this.currTxnStartReadTimestamp = System.nanoTime();
        if (this.transaction != null) {
            LOGGER.warn("Illegal Start Transaction State");
            throw new DatabusRuntimeException("Got startXtion without an endXtion for previous transaction");
        }
        this.transaction = new Transaction();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void endXtion(long eventTimeStamp) {
        if (!this.shutdownRequested.get()) {
            this.currTxnTimestamp = eventTimeStamp * 1000000L;
            long txnReadLatency = System.nanoTime() - this.currTxnStartReadTimestamp;
            try {
                if (this.transaction.getScn() == -1L) return;
                this.transaction.setSizeInBytes(this.currTxnSizeInBytes);
                this.transaction.setTxnNanoTimestamp(this.currTxnTimestamp);
                this.transaction.setTxnReadLatencyNanos(txnReadLatency);
                try {
                    this.onEndTransaction(this.transaction);
                    return;
                }
                catch (DatabusException e3) {
                    LOGGER.error("Got exception in the transaction handler ", (Throwable)e3);
                    throw new DatabusRuntimeException((Throwable)e3);
                }
            }
            finally {
                this.resetTxn();
            }
        } else {
            LOGGER.info("Not writing event to buffer as shutdown has been requested");
        }
    }

    @Override
    public void resetTxn() {
        this.currTableName = "";
        this.currTableId = -1L;
        this.perSourceTransaction = null;
        this.transaction = null;
        this.currTxnSizeInBytes = 0L;
    }

    private void startSource(String newTableName, long newTableId) {
        Short srcId;
        this.currTableName = newTableName;
        this.currTableId = newTableId;
        if (this.perSourceTransaction == null || this.transaction == null) {
            srcId = this.tableUriToSrcIdMap.get(this.currTableName);
            if (null == srcId) {
                LOGGER.warn("Could not find a matching logical source for table Uri (" + this.currTableName + ")");
                return;
            }
        } else {
            String errorMessage = "Seems like a startSource has been received without an endSource for previous source";
            LOGGER.error(errorMessage);
            throw new DatabusRuntimeException(errorMessage);
        }
        this.perSourceTransaction = new PerSourceTransaction((int)srcId.shortValue());
        this.transaction.mergePerSourceTransaction(this.perSourceTransaction);
    }

    private void endSource() {
        this.perSourceTransaction = null;
    }

    @Override
    public void setSource(long newTableId) {
        String newTableName = this.mysqlTableIdToTableNameMap.get(newTableId);
        if (null == newTableName) {
            LOGGER.error("TableMap Event not received for the change event tableId: " + newTableId);
            throw new DatabusRuntimeException("TableMap Event not received for the change event tableId: " + newTableId);
        }
        if (this.currTableName.isEmpty() && this.currTableId == -1L) {
            this.startSource(newTableName, newTableId);
        } else if (!this.currTableName.equals(newTableName) || this.currTableId != newTableId) {
            LOGGER.debug("Table name changed from " + this.currTableName + " to " + newTableName);
            this.endSource();
            this.startSource(newTableName, newTableId);
        }
    }

    @Override
    public Map<Long, String> getMysqlTableIdToTableNameMap() {
        return this.mysqlTableIdToTableNameMap;
    }

    @Override
    public void performChanges(long tableId, BinlogEventV4Header eventHeader, List<Row> rowList, DbusOpcode databusOpcode) {
        try {
            this.setSource(tableId);
            VersionedSchema schema = this.schemaRegistryService.fetchLatestVersionedSchemaBySourceName(this.tableUriToSrcNameMap.get(this.currTableName));
            LOGGER.debug("Schema obtained for table " + this.currTableName + " = " + schema);
            if (schema != null) {
                List<DbChangeEntry> entries = this.eventManagerMap.get(this.tableUriToSrcIdMap.get(this.currTableName)).frameAvroRecord(eventHeader, rowList, databusOpcode, this.binLogEventMappers, schema.getSchema(), this.frameSCN(this.currFileNum, (int)eventHeader.getPosition()));
                for (DbChangeEntry entry : entries) {
                    this.perSourceTransaction.mergeDbChangeEntrySet(entry);
                }
            } else {
                LOGGER.info("Events recieved from uninterested sources " + this.currTableName);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            LOGGER.error("Exception occurred while persisting changes to transaction " + e.getMessage());
        }
    }

    @Override
    public String getCurrTableName() {
        return this.currTableName;
    }

    @Override
    public void setCurrFileNum(int currFileNum) {
        this.currFileNum = currFileNum;
    }

    @Override
    public long getCurrTableId() {
        return this.currTableId;
    }

    public void setCurrTableName(String currTableName) {
        this.currTableName = currTableName;
    }

    public Transaction getTransaction() {
        return this.transaction;
    }

    public void setTransaction(Transaction transaction) {
        this.transaction = transaction;
    }

    public void setCurrTableId(long currTableId) {
        this.currTableId = currTableId;
    }

    public PerSourceTransaction getPerSourceTransaction() {
        return this.perSourceTransaction;
    }

    public void setPerSourceTransaction(PerSourceTransaction perSourceTransaction) {
        this.perSourceTransaction = perSourceTransaction;
    }

    @Override
    public boolean isBeginTxnSeen() {
        return this.beginTxnSeen;
    }

    @Override
    public void setBeginTxnSeen(boolean beginTxnSeen) {
        this.beginTxnSeen = beginTxnSeen;
    }

    public long getCurrTxnSizeInBytes() {
        return this.currTxnSizeInBytes;
    }

    public void setCurrTxnSizeInBytes(long currTxnSizeInBytes) {
        this.currTxnSizeInBytes = currTxnSizeInBytes;
    }

    public long getCurrTxnTimestamp() {
        return this.currTxnTimestamp;
    }

    public void setCurrTxnTimestamp(long currTxnTimestamp) {
        this.currTxnTimestamp = currTxnTimestamp;
    }

    public long getCurrTxnStartReadTimestamp() {
        return this.currTxnStartReadTimestamp;
    }

    public void setCurrTxnStartReadTimestamp(long currTxnStartReadTimestamp) {
        this.currTxnStartReadTimestamp = currTxnStartReadTimestamp;
    }

    public int getCurrFileNum() {
        return this.currFileNum;
    }

    public Map<Integer, BinLogEventMapper> getBinLogEventMappers() {
        return this.binLogEventMappers;
    }

    public void setBinLogEventMappers(Map<Integer, BinLogEventMapper> binLogEventMapper) {
        this.binLogEventMappers = binLogEventMapper;
    }

    public SchemaRegistryService getSchemaRegistryService() {
        return this.schemaRegistryService;
    }

    public void setSchemaRegistryService(SchemaRegistryService schemaRegistryService) {
        this.schemaRegistryService = schemaRegistryService;
    }

    public DbusEventBufferAppendable getEventBuffer() {
        return this.eventBuffer;
    }

    public MaxSCNReaderWriter getMaxSCNReaderWriter() {
        return this.maxSCNReaderWriter;
    }

    public DbusEventsStatisticsCollector getDbusEventsStatisticsCollector() {
        return this.dbusEventsStatisticsCollector;
    }

    public Map<Integer, MysqlAvroEventManager> getEventFactoryMap() {
        return this.eventManagerMap;
    }

    public AtomicLong getSinceSCN() {
        return this.sinceSCN;
    }

    public Map<String, Short> getTableUriToSrcIdMap() {
        return this.tableUriToSrcIdMap;
    }

    public Map<String, String> getTableUriToSrcNameMap() {
        return this.tableUriToSrcNameMap;
    }

    private long frameSCN(int logId, int offset) {
        long scn = logId;
        scn <<= 32;
        return scn |= (long)offset;
    }

    private void onEndTransaction(Transaction txn) throws DatabusException {
        try {
            this.addTxnToBuffer(txn);
            this.maxSCNReaderWriter.saveMaxScn(txn.getScn());
            this.mySqlEventProducer.updateSCN(txn.getScn());
        }
        catch (UnsupportedKeyException e) {
            LOGGER.error("Got UnsupportedKeyException exception while adding txn (" + txn + ") to the buffer", (Throwable)e);
            throw new DatabusException((Throwable)e);
        }
        catch (EventCreationException e) {
            LOGGER.error("Got EventCreationException exception while adding txn (" + txn + ") to the buffer", (Throwable)e);
            throw new DatabusException((Throwable)e);
        }
    }

    private void addTxnToBuffer(Transaction txn) throws DatabusException, UnsupportedKeyException, EventCreationException {
        this.eventBuffer.startEvents();
        long scn = txn.getScn();
        for (PerSourceTransaction t : txn.getOrderedPerSourceTransactions()) {
            for (DbChangeEntry changeEntry : t.getDbChangeEntrySet()) {
                int length = 0;
                try {
                    length = this.eventManagerMap.get(t.getSrcId()).createAndAppendEvent(changeEntry, this.eventBuffer, false, this.dbusEventsStatisticsCollector);
                    if (length < 0) {
                        LOGGER.error("Unable to append DBChangeEntry (" + changeEntry + ") to event buffer !! EVB State : " + this.eventBuffer);
                        throw new DatabusException("Unable to append DBChangeEntry (" + changeEntry + ") to event buffer !!");
                    }
                    LOGGER.debug("Added entry " + changeEntry + " successfully to event buffer");
                }
                catch (DatabusException e) {
                    LOGGER.error("Got databus exception :", (Throwable)e);
                    throw e;
                }
                catch (UnsupportedKeyException e) {
                    LOGGER.error("Got UnsupportedKeyException :", (Throwable)e);
                    throw e;
                }
                catch (EventCreationException e) {
                    LOGGER.error("Got EventCreationException :", (Throwable)e);
                    throw e;
                }
            }
        }
        this.eventBuffer.endEvents(scn, this.dbusEventsStatisticsCollector);
    }

    @Override
    public void setShutdownRequested(boolean shutdownRequested) {
        this.shutdownRequested.set(shutdownRequested);
    }
}

