/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.producer;

import com.flipkart.aesop.runtime.producer.AbstractEventProducer;
import com.flipkart.aesop.runtime.producer.avro.MysqlAvroEventManager;
import com.flipkart.aesop.runtime.producer.eventlistener.OpenReplicationListener;
import com.flipkart.aesop.runtime.producer.eventprocessor.BinLogEventProcessor;
import com.flipkart.aesop.runtime.producer.mapper.BinLogEventMapper;
import com.flipkart.aesop.runtime.producer.schema.eventprocessor.SchemaChangeEventProcessor;
import com.flipkart.aesop.runtime.producer.txnprocessor.MysqlTransactionManager;
import com.flipkart.aesop.runtime.producer.txnprocessor.impl.MysqlTransactionManagerImpl;
import com.google.code.or.OpenReplicator;
import com.google.code.or.binlog.BinlogEventListener;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class MysqlEventProducer
extends AbstractEventProducer
implements InitializingBean {
    private static final Logger LOGGER = LogFactory.getLogger(MysqlEventProducer.class);
    private static final Integer DEFAULT_MYSQL_PORT = 3306;
    private static final Pattern PATH_PATTERN = Pattern.compile("/([0-9]+)/[a-z|A-Z|0-9|-]+");
    private static final int SERVER_ID = 1;
    private static final int BIN_LOG_PREFIX = 2;
    protected Map<Integer, BinLogEventMapper> binLogEventMappers;
    protected OpenReplicator openReplicator;
    protected MysqlTransactionManager mysqlTxnManager;
    protected Map<Integer, BinLogEventProcessor> eventsMap;
    protected SchemaChangeEventProcessor schemaChangeEventProcessor;

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.binLogEventMappers, (String)"'binLogEventMapper' cannot be null. No bin log event mapper found. This Mysql Events producer will not be initialized");
        Assert.notNull(this.eventsMap, (String)"'eventsMap' cannot be null. eventsMap is not initialized properly.This Mysql Events producer will not be initialized");
    }

    public void start(long sinceSCN) {
        String binlogFile;
        this.sinceSCN.set(sinceSCN);
        this.openReplicator = new OpenReplicator();
        try {
            LogicalSourceStaticConfig sourceConfig;
            String binlogFilePrefix = this.processUri(new URI(this.physicalSourceStaticConfig.getUri()));
            int offset = MysqlEventProducer.offset(sinceSCN);
            int logid = MysqlEventProducer.logid(sinceSCN);
            LOGGER.debug("SCN : " + sinceSCN + " logid : " + logid);
            binlogFile = String.format("%s.%06d", binlogFilePrefix, logid);
            LOGGER.debug("Bin Log File Name : " + binlogFile);
            HashMap<String, Short> tableUriToSrcIdMap = new HashMap<String, Short>();
            HashMap<String, String> tableUriToSrcNameMap = new HashMap<String, String>();
            HashMap<Integer, MysqlAvroEventManager> eventManagersMap = new HashMap<Integer, MysqlAvroEventManager>();
            LogicalSourceStaticConfig[] logicalSourceStaticConfigArray = this.physicalSourceStaticConfig.getSources();
            int n = logicalSourceStaticConfigArray.length;
            int n2 = 0;
            while (n2 < n) {
                sourceConfig = logicalSourceStaticConfigArray[n2];
                tableUriToSrcIdMap.put(sourceConfig.getUri().toLowerCase(), sourceConfig.getId());
                tableUriToSrcNameMap.put(sourceConfig.getUri().toLowerCase(), sourceConfig.getName());
                MysqlAvroEventManager manager = null;
                try {
                    manager = this.buildEventManagers(sourceConfig, this.physicalSourceStaticConfig);
                }
                catch (Exception ex) {
                    LOGGER.error("Got exception while building monitored sources for config :" + sourceConfig, (Throwable)ex);
                    throw new InvalidConfigException((Throwable)ex);
                }
                eventManagersMap.put(Integer.valueOf(sourceConfig.getId()), manager);
                ++n2;
            }
            this.schemaChangeEventProcessor.setSchemaRegistryService(this.schemaRegistryService);
            this.schemaChangeEventProcessor.setTableUriToSrcNameMap(tableUriToSrcNameMap);
            logicalSourceStaticConfigArray = this.physicalSourceStaticConfig.getSources();
            n = logicalSourceStaticConfigArray.length;
            n2 = 0;
            while (n2 < n) {
                sourceConfig = logicalSourceStaticConfigArray[n2];
                String[] parts = sourceConfig.getUri().split("\\.");
                this.schemaChangeEventProcessor.process(parts[0], parts[1]);
                ++n2;
            }
            this.mysqlTxnManager = new MysqlTransactionManagerImpl(this.eventBuffer, this.maxScnReaderWriter, this.dbusEventsStatisticsCollector, eventManagersMap, logid, tableUriToSrcIdMap, tableUriToSrcNameMap, this.schemaRegistryService, this.sinceSCN, this.binLogEventMappers);
            this.mysqlTxnManager.setShutdownRequested(false);
            OpenReplicationListener orl = new OpenReplicationListener(this.mysqlTxnManager, this.eventsMap, this.schemaChangeEventProcessor, binlogFilePrefix);
            this.openReplicator.setBinlogFileName(binlogFile);
            this.openReplicator.setBinlogPosition((long)offset);
            this.openReplicator.setBinlogEventListener((BinlogEventListener)orl);
            this.openReplicator.start();
        }
        catch (URISyntaxException u) {
            LOGGER.error("Exception occurred while processing uri : " + u);
            return;
        }
        catch (InvalidConfigException e) {
            LOGGER.error("Exception occurred while processing uri : " + (Object)((Object)e));
            return;
        }
        catch (Exception e) {
            LOGGER.error("Error occurred while starting open replication.." + e);
            return;
        }
        LOGGER.info("Open Replicator has been started successfully for the file " + binlogFile);
    }

    public MysqlAvroEventManager buildEventManagers(LogicalSourceStaticConfig sourceConfig, PhysicalSourceStaticConfig pConfig) throws DatabusException, EventCreationException, UnsupportedKeyException, InvalidConfigException {
        MysqlAvroEventManager manager = new MysqlAvroEventManager(sourceConfig.getId(), (short)pConfig.getId());
        return manager;
    }

    public static int logid(long scn) {
        if (scn == -1L || scn == 0L) {
            return 1;
        }
        return (int)(scn >> 32 & 0xFFFFFFFFFFFFFFFFL);
    }

    public static int offset(long scn) {
        if (scn == -1L || scn == 0L) {
            return 4;
        }
        return (int)(scn & 0xFFFFFFFFFFFFFFFFL);
    }

    public String getName() {
        return ((Object)((Object)this)).getClass().getName();
    }

    public long getSCN() {
        return this.sinceSCN.get();
    }

    public boolean isPaused() {
        return !this.openReplicator.isRunning();
    }

    public boolean isRunning() {
        return this.openReplicator.isRunning();
    }

    public void shutdown() {
        LOGGER.info("Shutdown has been requested. MYSQLEventProducer shutttng down");
        try {
            this.mysqlTxnManager.setShutdownRequested(true);
            LOGGER.info("Open Replicator Shutting down");
            this.openReplicator.stop(10L, TimeUnit.SECONDS);
            LOGGER.info("Open Replicator shutdown complete");
            super.shutdown();
        }
        catch (Exception e) {
            LOGGER.error("Error while stopping open replicator", (Throwable)e);
        }
        LOGGER.info("MYSQLEventProducer shutdown completed");
    }

    public void pause() {
        throw new UnsupportedOperationException("'pause' is not supported on this event producer");
    }

    public void unpause() {
        throw new UnsupportedOperationException("'unpause' is not supported on this event producer");
    }

    public void waitForShutdown() throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown' is not supported on this event producer");
    }

    public void waitForShutdown(long time) throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown(long time)' is not supported on this event producer");
    }

    public Map<Integer, BinLogEventMapper> getBinLogEventMappers() {
        return this.binLogEventMappers;
    }

    public void setBinLogEventMappers(Map<Integer, BinLogEventMapper> binLogEventMapper) {
        this.binLogEventMappers = binLogEventMapper;
    }

    public MysqlTransactionManager getMysqlTxnManager() {
        return this.mysqlTxnManager;
    }

    public void setMysqlTxnManager(MysqlTransactionManager mysqlTxnManager) {
        this.mysqlTxnManager = mysqlTxnManager;
    }

    public Map<Integer, BinLogEventProcessor> getEventsMap() {
        return this.eventsMap;
    }

    public void setEventsMap(Map<Integer, BinLogEventProcessor> eventsMap) {
        this.eventsMap = eventsMap;
    }

    public SchemaChangeEventProcessor getSchemaChangeEventProcessor() {
        return this.schemaChangeEventProcessor;
    }

    public void setSchemaChangeEventProcessor(SchemaChangeEventProcessor schemaChangeEventProcessor) {
        this.schemaChangeEventProcessor = schemaChangeEventProcessor;
    }

    protected String processUri(URI uri) throws InvalidConfigException {
        String path;
        String userInfo = uri.getUserInfo();
        if (userInfo == null) {
            String errorMessage = "missing user info in: " + uri;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        int slashPos = userInfo.indexOf(47);
        if (slashPos < 0) {
            slashPos = userInfo.length();
        } else if (slashPos == 0) {
            String errorMessage = "missing user name in user info: " + userInfo;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        String userName = userInfo.substring(0, slashPos);
        String userPass = slashPos < userInfo.length() - 1 ? userInfo.substring(slashPos + 1) : null;
        String hostName = uri.getHost();
        int port = uri.getPort();
        if (port < 0) {
            port = DEFAULT_MYSQL_PORT;
        }
        if ((path = uri.getPath()) == null) {
            String errorMessage = "missing path: " + uri;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        Matcher matcher = PATH_PATTERN.matcher(path);
        if (!matcher.matches()) {
            String errorMessage = "invalid path:" + path;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        Object[] group = matcher.group().split("/");
        if (group.length != 3) {
            String errorMessage = "Invalid format " + Arrays.toString(group);
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        String serverIdStr = group[1];
        int serverId = -1;
        try {
            serverId = Integer.parseInt(serverIdStr);
        }
        catch (NumberFormatException e) {
            String errorMessage = "incorrect mysql serverid:" + serverId;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        if (this.openReplicator != null) {
            this.openReplicator.setUser(userName);
            if (userPass != null) {
                this.openReplicator.setPassword(userPass);
            }
            this.openReplicator.setHost(hostName);
            this.openReplicator.setPort(port);
            this.openReplicator.setServerId(serverId);
        }
        LOGGER.debug("Extracted bin log prefix is " + (String)group[2]);
        return group[2];
    }
}

