/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.bootstrap.mysql;

import com.flipkart.aesop.bootstrap.mysql.eventlistener.OpenReplicationListener;
import com.flipkart.aesop.bootstrap.mysql.eventprocessor.BinLogEventProcessor;
import com.flipkart.aesop.bootstrap.mysql.txnprocessor.MysqlTransactionManager;
import com.flipkart.aesop.bootstrap.mysql.txnprocessor.impl.MysqlTransactionManagerImpl;
import com.flipkart.aesop.event.AbstractEvent;
import com.flipkart.aesop.runtime.bootstrap.producer.BlockingEventProducer;
import com.google.code.or.OpenReplicator;
import com.google.code.or.binlog.BinlogEventListener;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class MysqlEventProducer<T extends AbstractEvent>
extends BlockingEventProducer {
    private static Long startTime = System.nanoTime();
    private static final Logger LOGGER = LogFactory.getLogger(MysqlEventProducer.class);
    private static final Integer DEFAULT_MYSQL_PORT = 3306;
    private static final Pattern PATH_PATTERN = Pattern.compile("/([0-9]+)/[a-z|A-Z|0-9|-]+");
    private static final int SERVER_ID = 1;
    private static final int BIN_LOG_PREFIX = 2;
    protected OpenReplicator openReplicator;
    protected MysqlTransactionManager mysqlTxnManager;
    protected Map<Integer, BinLogEventProcessor> eventsMap;

    public void start(long sinceSCN) {
        String binlogFile;
        this.sinceSCN.set(sinceSCN);
        this.openReplicator = new OpenReplicator();
        try {
            String binlogFilePrefix = this.processUri(new URI(this.physicalSourceStaticConfig.getUri()));
            int offset = MysqlEventProducer.offset(sinceSCN);
            int logid = MysqlEventProducer.logid(sinceSCN);
            LOGGER.info("SCN : " + offset + " logid : " + logid);
            binlogFile = String.format("%s.%06d", binlogFilePrefix, logid);
            LOGGER.info("Bin Log File Name : " + binlogFile);
            HashMap<String, Short> tableUriToSrcIdMap = new HashMap<String, Short>();
            HashMap<String, String> tableUriToSrcNameMap = new HashMap<String, String>();
            for (LogicalSourceStaticConfig sourceConfig : this.physicalSourceStaticConfig.getSources()) {
                tableUriToSrcIdMap.put(sourceConfig.getUri().toLowerCase(), sourceConfig.getId());
                tableUriToSrcNameMap.put(sourceConfig.getUri().toLowerCase(), sourceConfig.getName());
            }
            this.mysqlTxnManager = new MysqlTransactionManagerImpl(logid, tableUriToSrcIdMap, tableUriToSrcNameMap, this.schemaRegistryService, this, this.sourceEventConsumer);
            this.mysqlTxnManager.setShutdownRequested(false);
            OpenReplicationListener orl = new OpenReplicationListener(this.mysqlTxnManager, this.eventsMap, binlogFilePrefix);
            this.openReplicator.setBinlogFileName(binlogFile);
            this.openReplicator.setBinlogPosition((long)offset);
            this.openReplicator.setBinlogEventListener((BinlogEventListener)orl);
            this.openReplicator.start();
        }
        catch (URISyntaxException u) {
            LOGGER.error("Exception occurred while processing uri : " + u);
            return;
        }
        catch (InvalidConfigException e) {
            LOGGER.error("Exception occurred while processing uri : " + (Object)((Object)e));
            return;
        }
        catch (Exception e) {
            LOGGER.error("Error occurred while starting open replication.." + e);
            return;
        }
        LOGGER.info("Open Replicator has been started successfully for the file " + binlogFile);
    }

    protected String processUri(URI uri) throws InvalidConfigException {
        String path;
        String userInfo = uri.getUserInfo();
        if (null == userInfo) {
            String errorMessage = "missing user info in: " + uri;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        int slashPos = userInfo.indexOf(47);
        if (slashPos < 0) {
            slashPos = userInfo.length();
        } else if (0 == slashPos) {
            String errorMessage = "missing user name in user info: " + userInfo;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        String userName = userInfo.substring(0, slashPos);
        String userPass = slashPos < userInfo.length() - 1 ? userInfo.substring(slashPos + 1) : null;
        String hostName = uri.getHost();
        int port = uri.getPort();
        if (port < 0) {
            port = DEFAULT_MYSQL_PORT;
        }
        if (null == (path = uri.getPath())) {
            String errorMessage = "missing path: " + uri;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        Matcher matcher = PATH_PATTERN.matcher(path);
        if (!matcher.matches()) {
            String errorMessage = "invalid path:" + path;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        Object[] group = matcher.group().split("/");
        if (group.length != 3) {
            String errorMessage = "Invalid format " + Arrays.toString(group);
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        String serverIdStr = group[1];
        int serverId = -1;
        try {
            serverId = Integer.parseInt(serverIdStr);
        }
        catch (NumberFormatException e) {
            String errorMessage = "incorrect mysql serverid:" + serverId;
            LOGGER.error(errorMessage);
            throw new InvalidConfigException(errorMessage);
        }
        if (null != this.openReplicator) {
            this.openReplicator.setUser(userName);
            if (null != userPass) {
                this.openReplicator.setPassword(userPass);
            }
            this.openReplicator.setHost(hostName);
            this.openReplicator.setPort(port);
            this.openReplicator.setServerId(serverId);
        }
        LOGGER.debug("Extracted bin log prefix is " + (String)group[2]);
        return group[2];
    }

    public static int logid(long scn) {
        if (scn == -1L || scn == 0L) {
            return 1;
        }
        return (int)(scn >> 32 & 0xFFFFFFFFFFFFFFFFL);
    }

    public static int offset(long scn) {
        if (scn == -1L || scn == 0L) {
            return 4;
        }
        return (int)(scn & 0xFFFFFFFFFFFFFFFFL);
    }

    public void setEventsMap(Map<Integer, BinLogEventProcessor> eventsMap) {
        this.eventsMap = eventsMap;
    }

    public String getName() {
        return ((Object)((Object)this)).getClass().getName();
    }

    public long getSCN() {
        return this.sinceSCN.get();
    }

    public void updateSCN(long latestScn) {
        this.sinceSCN.set(latestScn);
        this.metricsCollector.setProducerSCN(this.name, latestScn);
    }

    public boolean isRunning() {
        return this.openReplicator.isRunning();
    }

    public boolean isPaused() {
        return !this.openReplicator.isRunning();
    }

    public void unpause() {
        throw new UnsupportedOperationException("'unpause' is not supported on this event producer");
    }

    public void pause() {
        throw new UnsupportedOperationException("'unpause' is not supported on this event producer");
    }

    public void shutdown() {
        try {
            LOGGER.info("Shutdown has been requested. MYSQLEventProducer shutting down");
            this.openReplicator.stop(5L, TimeUnit.SECONDS);
            LOGGER.info("### Bootstrap Process completed successfully ###");
            LOGGER.info("Time Taken:" + (System.nanoTime() - startTime));
        }
        catch (Exception e) {
            LOGGER.error("Error while stopping mysql bootstrap", (Throwable)e);
        }
    }

    public void waitForShutdown() throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown' is not supported on this event producer");
    }

    public void waitForShutdown(long l) throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown' is not supported on this event producer");
    }
}

