/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers;

import com.linkedin.databus.client.ConnectionStateFactory;
import com.linkedin.databus.client.DatabusBootstrapConnectionFactory;
import com.linkedin.databus.client.DatabusRelayConnectionFactory;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.consumer.DatabusConsumerEventBuffer;
import com.linkedin.databus.client.consumer.DatabusV2ConsumerRegistration;
import com.linkedin.databus.client.netty.NettyHttpConnectionFactory;
import com.linkedin.databus.client.pub.DatabusCombinedConsumer;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStats;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusEventFactory;
import com.linkedin.databus.core.DbusEventV2Factory;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus.core.util.NamedThreadFactory;
import com.linkedin.databus.core.util.RngUtils;
import com.linkedin.databus.monitoring.mbean.EventSourceStatistics;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.core.seq.MaxSCNWriter;
import com.linkedin.databus2.producers.EventProducer;
import com.linkedin.databus2.producers.RelayStatsAdapter;
import com.linkedin.databus2.producers.db.ReadEventCycleSummary;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;

public class RelayEventProducer
implements EventProducer {
    private DatabusSourcesConnection _dbusConnection = null;
    private String _name = null;
    private DbusEventsStatisticsCollector _statsCollector = null;
    private DatabusConsumerEventBuffer _consumerEventBuffer = null;
    private MaxSCNReaderWriter _scnReaderWriter;
    private Logger _eventsLog;
    private RelayStatsAdapter _relayStatsAdapter;
    private RelayLogger _relayLogger;
    private DatabusClientNettyThreadPools _nettyThreadPools;
    private long _restartScnOffset = 0L;
    public static final String MODULE = RelayEventProducer.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);

    public RelayEventProducer(PhysicalSourceStaticConfig config, DbusEventBufferAppendable consumerBuffer, DbusEventsStatisticsCollector statsCollector, MaxSCNReaderWriter scnReaderWriter) {
        this(config, consumerBuffer, statsCollector, scnReaderWriter, null);
    }

    public RelayEventProducer(PhysicalSourceStaticConfig config, DbusEventBufferAppendable consumerBuffer, DbusEventsStatisticsCollector statsCollector, MaxSCNReaderWriter scnReaderWriter, DatabusClientNettyThreadPools nettyThreadpools) {
        try {
            this._name = config.getName();
            this._statsCollector = statsCollector;
            this._scnReaderWriter = scnReaderWriter;
            this._relayStatsAdapter = new RelayStatsAdapter(this._name, statsCollector);
            this._eventsLog = Logger.getLogger((String)("com.linkedin.databus2.producers.db.events." + this._name));
            this._restartScnOffset = config.getRestartScnOffset();
            int largestEventSize = config.getLargestEventSizeInBytes();
            long internalBufferSize = 2L * config.getLargestWindowSizeInBytes();
            int averageEventSize = Math.max((int)((double)largestEventSize * 0.1), 20480);
            long consumerTimeoutMs = 10000L;
            long connTimeoutMs = 15000L;
            long pollIntervalMs = config.getRetries().getInitSleep();
            int consumerParallelism = 1;
            this._relayLogger = new RelayLogger(5000, "RelayLogger");
            this._consumerEventBuffer = new DatabusConsumerEventBuffer(consumerBuffer, statsCollector, (MaxSCNWriter)scnReaderWriter);
            String subscriptionString = RelayEventProducer.createSubscriptionString(config);
            LOG.info((Object)("Subscription string=" + subscriptionString));
            int id = RngUtils.randomPositiveInt() % 10000 + 1;
            this._nettyThreadPools = this._nettyThreadPools == null ? DatabusClientNettyThreadPools.createNettyThreadPools(id) : nettyThreadpools;
            this._dbusConnection = RelayEventProducer.createDatabusSourcesConnection(this._name, id, config.getUri(), subscriptionString, (DatabusCombinedConsumer)this._consumerEventBuffer, internalBufferSize, largestEventSize, consumerTimeoutMs, pollIntervalMs, connTimeoutMs, consumerParallelism, true, this._nettyThreadPools, 900, 2, averageEventSize);
        }
        catch (InvalidConfigException e) {
            LOG.fatal((Object)("Invalid config in creating a relay event producer" + (Object)((Object)e)));
        }
    }

    public static DatabusSourcesConnection createDatabusSourcesConnection(String producerName, String serverName, String subscriptionString, DatabusCombinedConsumer consumer, long internalBufferMaxSize, int largestEventSize, long consumerTimeoutMs, long pollIntervalMs, long connTimeoutMs, int consumerParallelism, boolean blockingBuffer, int initReadBufferSize) throws InvalidConfigException {
        int id = RngUtils.randomPositiveInt() % 10000 + 1;
        return RelayEventProducer.createDatabusSourcesConnection(producerName, id, serverName, subscriptionString, consumer, internalBufferMaxSize, largestEventSize, consumerTimeoutMs, pollIntervalMs, connTimeoutMs, consumerParallelism, blockingBuffer, DatabusClientNettyThreadPools.createNettyThreadPools(id), 0, 2, initReadBufferSize);
    }

    public static DatabusSourcesConnection createDatabusSourcesConnection(String producerName, String serverName, String subscriptionString, DatabusCombinedConsumer consumer, long internalBufferMaxSize, int largestEventSize, long consumerTimeoutMs, long pollIntervalMs, long connTimeoutMs, int consumerParallelism, boolean blockingBuffer) throws InvalidConfigException {
        int id = RngUtils.randomPositiveInt() % 10000 + 1;
        return RelayEventProducer.createDatabusSourcesConnection(producerName, id, serverName, subscriptionString, consumer, internalBufferMaxSize, largestEventSize, consumerTimeoutMs, pollIntervalMs, connTimeoutMs, consumerParallelism, blockingBuffer, DatabusClientNettyThreadPools.createNettyThreadPools(id), 0, 2, 0);
    }

    public static DatabusSourcesConnection createDatabusSourcesConnection(String producerName, int id, String serverName, String subscriptionString, DatabusCombinedConsumer consumer, long internalBufferMaxSize, int largestEventSize, long consumerTimeoutMs, long pollIntervalMs, long connTimeoutMs, int consumerParallelism, boolean blockingBuffer, DatabusClientNettyThreadPools nettyThreadPools, int noEventsTimeoutSec, int maxEventVersion, int initReadBufferSize) throws InvalidConfigException {
        Set<ServerInfo> relayServices = RelayEventProducer.createServerInfo(serverName, subscriptionString);
        Set bootstrapServices = null;
        String[] subscriptionList = subscriptionString.split(",");
        List subsList = DatabusSubscription.createSubscriptionList(Arrays.asList(subscriptionList));
        List sourcesStrList = DatabusSubscription.getStrList((List)subsList);
        LOG.info((Object)("The sourcesList is " + sourcesStrList));
        List<DatabusV2ConsumerRegistration> relayConsumers = RelayEventProducer.createDatabusV2ConsumerRegistration(consumer, sourcesStrList);
        List bstConsumers = null;
        DatabusSourcesConnection.Config confBuilder = new DatabusSourcesConnection.Config();
        confBuilder.setId(id);
        confBuilder.setConsumeCurrent(true);
        confBuilder.setReadLatestScnOnError(false);
        confBuilder.setConsumerTimeBudgetMs(consumerTimeoutMs);
        confBuilder.getPullerRetries().setMaxRetryNum(-1);
        confBuilder.getPullerRetries().setInitSleep(pollIntervalMs);
        confBuilder.setConsumerParallelism(consumerParallelism);
        confBuilder.getDispatcherRetries().setMaxRetryNum(1);
        DbusEventBuffer.Config bufferConf = new DbusEventBuffer.Config();
        bufferConf.setMaxSize(internalBufferMaxSize);
        bufferConf.setAllocationPolicy("DIRECT_MEMORY");
        if (initReadBufferSize > 0) {
            bufferConf.setAverageEventSize(initReadBufferSize);
        }
        bufferConf.setMaxEventSize(largestEventSize);
        bufferConf.setScnIndexSize(65536);
        String queuePolicy = blockingBuffer ? "BLOCK_ON_WRITE" : "OVERWRITE_ON_WRITE";
        bufferConf.setQueuePolicy(queuePolicy);
        double newCkptPct = confBuilder.computeSafeCheckpointThresholdPct(bufferConf);
        if (newCkptPct < 5.0 || newCkptPct > 95.0) {
            LOG.warn((Object)("Not setting required checkpointThresholdPct : " + newCkptPct + "to  accommodate largestEventSize= " + largestEventSize + " in buffer of size " + bufferConf.getMaxSize()));
            if (newCkptPct <= 0.0) {
                newCkptPct = confBuilder.getCheckpointThresholdPct();
            }
            if (newCkptPct < 5.0) {
                newCkptPct = 5.0;
            } else if (newCkptPct > 95.0) {
                newCkptPct = 95.0;
            }
        }
        LOG.info((Object)("Setting checkpointThresholdPct:" + newCkptPct));
        confBuilder.setCheckpointThresholdPct(newCkptPct);
        confBuilder.setEventBuffer(bufferConf);
        confBuilder.setNoEventsConnectionResetTimeSec(noEventsTimeoutSec);
        DatabusSourcesConnection.StaticConfig connConfig = confBuilder.build();
        DbusEventBuffer buffer = new DbusEventBuffer(connConfig.getEventBuffer());
        buffer.start(0L);
        DbusEventBuffer bootstrapBuffer = null;
        long readTimeoutMs = connTimeoutMs;
        long writeTimeoutMs = connTimeoutMs;
        long bstReadTimeoutMs = connTimeoutMs;
        int protocolVersion = 2;
        NettyHttpConnectionFactory defaultConnFactory = new NettyHttpConnectionFactory(nettyThreadPools.getBossExecutorService(), nettyThreadPools.getIoExecutorService(), null, nettyThreadPools.getTimer(), writeTimeoutMs, readTimeoutMs, bstReadTimeoutMs, protocolVersion, maxEventVersion, nettyThreadPools.getChannelGroup());
        int maxThreadsNum = 1;
        int keepAliveMs = 1000;
        OrderedMemoryAwareThreadPoolExecutor defaultExecutorService = new OrderedMemoryAwareThreadPoolExecutor(maxThreadsNum, 0L, 0L, (long)keepAliveMs, TimeUnit.MILLISECONDS);
        ConsumerCallbackStats relayConsumerStats = new ConsumerCallbackStats(id, producerName + ".inbound.cons", producerName + ".inbound.cons", true, false, null, ManagementFactory.getPlatformMBeanServer());
        ConsumerCallbackStats bootstrapConsumerStats = new ConsumerCallbackStats(id, producerName + ".inbound.bs.cons", producerName + ".inbound.bs.cons", true, false, null, ManagementFactory.getPlatformMBeanServer());
        UnifiedClientStats unifiedClientStats = new UnifiedClientStats(id, producerName + ".inbound.unified.cons", producerName + ".inbound.unified.cons", true, false, 300000L, null, ManagementFactory.getPlatformMBeanServer());
        NettyHttpConnectionFactory relayConnFactory = defaultConnFactory;
        NettyHttpConnectionFactory bootstrapConnFactory = defaultConnFactory;
        ConnectionStateFactory connStateFactory = new ConnectionStateFactory(sourcesStrList);
        DatabusSourcesConnection conn = new DatabusSourcesConnection(connConfig, subsList, relayServices, bootstrapServices, relayConsumers, bstConsumers, buffer, bootstrapBuffer, (ExecutorService)defaultExecutorService, null, null, null, relayConsumerStats, bootstrapConsumerStats, unifiedClientStats, null, (DatabusRelayConnectionFactory)relayConnFactory, (DatabusBootstrapConnectionFactory)bootstrapConnFactory, null, null, null, (DbusEventFactory)new DbusEventV2Factory(), connStateFactory);
        return conn;
    }

    static String createSubscriptionString(PhysicalSourceStaticConfig config) {
        StringBuilder s = new StringBuilder();
        for (LogicalSourceStaticConfig sourceConfig : config.getSources()) {
            if (s.length() > 0) {
                s.append(",");
            }
            s.append(sourceConfig.getName());
        }
        return s.toString();
    }

    static Set<ServerInfo> createServerInfo(String serverName, String subscriptions) throws InvalidConfigException {
        HashSet<ServerInfo> serverInfo = new HashSet<ServerInfo>();
        ServerInfo.ServerInfoBuilder sBuilder = new ServerInfo.ServerInfoBuilder();
        sBuilder.setAddress(serverName + ":" + subscriptions);
        serverInfo.add(sBuilder.build());
        return serverInfo;
    }

    static List<DatabusV2ConsumerRegistration> createDatabusV2ConsumerRegistration(DatabusCombinedConsumer consumer, List<String> sourcesStrList) {
        ArrayList<DatabusV2ConsumerRegistration> regs = new ArrayList<DatabusV2ConsumerRegistration>();
        regs.add(new DatabusV2ConsumerRegistration(consumer, sourcesStrList, null));
        return regs;
    }

    public synchronized void shutdown() {
        if (this._dbusConnection != null) {
            this._dbusConnection.stop();
        }
        this._relayLogger.shutdown();
        this._relayLogger.interrupt();
    }

    public synchronized void waitForShutdown() throws InterruptedException, IllegalStateException {
        if (this._dbusConnection != null) {
            this._dbusConnection.await();
        }
    }

    public void waitForShutdown(long timeout) throws InterruptedException, IllegalStateException {
        if (this._dbusConnection != null) {
            this._dbusConnection.await();
        }
    }

    public String getName() {
        return this._name;
    }

    public long getSCN() {
        if (this._statsCollector != null) {
            return this._statsCollector.getTotalStats().getMaxScn();
        }
        return 0L;
    }

    public synchronized void start(long sinceSCN) {
        if (this._dbusConnection != null && !this._dbusConnection.getConnectionStatus().isRunningStatus()) {
            LOG.info((Object)("In RelayEventProducer start:  running =" + this._dbusConnection.getConnectionStatus().isRunningStatus()));
            LOG.info((Object)("Requested sinceSCN = " + sinceSCN));
            Checkpoint cp = this.getCheckpoint(sinceSCN, this._scnReaderWriter);
            if (cp != null && this._consumerEventBuffer.getStartSCN() < 0L) {
                long savedScn = cp.getWindowScn();
                LOG.info((Object)("Checkpoint read = " + savedScn + " restartScnOffset=" + this._restartScnOffset));
                long newScn = savedScn >= this._restartScnOffset ? savedScn - this._restartScnOffset : 0L;
                cp.setWindowScn(Long.valueOf(newScn));
                LOG.info((Object)("Setting start scn of event buffer to " + cp.getWindowScn()));
                this._consumerEventBuffer.setStartSCN(cp.getWindowScn());
            }
            LOG.info((Object)("Eventbuffer start scn = " + this._consumerEventBuffer.getStartSCN()));
            this._dbusConnection.getRelayPullThread().getConnectionState().setCheckpoint(cp);
            this._dbusConnection.start();
            this._relayLogger.setDaemon(true);
            this._relayLogger.start();
        } else if (this._dbusConnection == null) {
            LOG.error((Object)"Not started! Connection is null");
        } else {
            LOG.warn((Object)("dbusConnection status=" + this._dbusConnection.getConnectionStatus().getStatus()));
        }
    }

    protected Checkpoint getCheckpoint(long sinceSCN, MaxSCNReaderWriter scnReaderWriter) {
        long scn = sinceSCN;
        if (scn < 0L && this._scnReaderWriter != null) {
            try {
                scn = this._scnReaderWriter.getMaxScn();
            }
            catch (DatabusException e) {
                LOG.info((Object)("Cannot read persisted SCN " + (Object)((Object)e)));
                scn = -1L;
            }
        }
        if (scn <= 0L) {
            return null;
        }
        Checkpoint cp = new Checkpoint();
        cp.setConsumptionMode(DbusClientMode.ONLINE_CONSUMPTION);
        cp.setWindowOffset(-1L);
        cp.setWindowScn(Long.valueOf(scn));
        return cp;
    }

    public synchronized boolean isRunning() {
        if (this._dbusConnection != null) {
            return this._dbusConnection.isRunning();
        }
        return false;
    }

    public synchronized boolean isPaused() {
        if (this._dbusConnection != null) {
            return this._dbusConnection.getConnectionStatus().isPausedStatus();
        }
        return false;
    }

    public synchronized void unpause() {
        if (this._dbusConnection != null) {
            this._dbusConnection.getConnectionStatus().resume();
        }
    }

    public synchronized void pause() {
        if (this._dbusConnection != null) {
            this._dbusConnection.getConnectionStatus().pause();
        }
    }

    public EventSourceStatistics[] getEventSourceStats() {
        return this._relayStatsAdapter.getEventSourceStatistics();
    }

    public static class DatabusClientNettyThreadPools {
        private final Timer _timer;
        private final ExecutorService _bossExecutorService;
        private final ExecutorService _ioExecutorService;
        private final ChannelGroup _channelGroup;

        public static DatabusClientNettyThreadPools createNettyThreadPools(int id) {
            return new DatabusClientNettyThreadPools(id);
        }

        public DatabusClientNettyThreadPools(int id) {
            this._timer = new HashedWheelTimer(5L, TimeUnit.MILLISECONDS);
            this._ioExecutorService = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("io" + id));
            this._bossExecutorService = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("boss" + id));
            this._channelGroup = new DefaultChannelGroup();
        }

        public DatabusClientNettyThreadPools(int id, Timer timer, ExecutorService bossExecutorService, ExecutorService ioExecutorService, ChannelGroup channelGroup) {
            this._timer = timer != null ? timer : new HashedWheelTimer(5L, TimeUnit.MILLISECONDS);
            this._bossExecutorService = bossExecutorService != null ? bossExecutorService : Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("io" + id));
            this._ioExecutorService = ioExecutorService != null ? ioExecutorService : Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("boss" + id));
            this._channelGroup = channelGroup != null ? channelGroup : new DefaultChannelGroup();
        }

        public Timer getTimer() {
            return this._timer;
        }

        public ExecutorService getBossExecutorService() {
            return this._bossExecutorService;
        }

        public ExecutorService getIoExecutorService() {
            return this._ioExecutorService;
        }

        public ChannelGroup getChannelGroup() {
            return this._channelGroup;
        }
    }

    private class RelayLogger
    extends Thread {
        private final int _logIntervalMs;
        private boolean _shutdown;

        public RelayLogger(int logIntervalMs, String relayLogger) {
            super(relayLogger);
            this._shutdown = false;
            this._logIntervalMs = logIntervalMs;
        }

        public void shutdown() {
            this._shutdown = true;
        }

        @Override
        public void run() {
            LOG.info((Object)"Started RelayLogger");
            while (!this._shutdown) {
                try {
                    ReadEventCycleSummary readEventCycle = RelayEventProducer.this._relayStatsAdapter.getReadEventCycleSummary();
                    if (readEventCycle != null && (RelayEventProducer.this._eventsLog.isDebugEnabled() || RelayEventProducer.this._eventsLog.isInfoEnabled() && readEventCycle.getTotalEventNum() > 0)) {
                        RelayEventProducer.this._eventsLog.info((Object)readEventCycle.toString());
                    }
                    Thread.sleep(this._logIntervalMs);
                }
                catch (InterruptedException e) {
                    LOG.info((Object)"RelayLogger interrupted! Will shutdown");
                    this._shutdown = true;
                }
            }
        }
    }
}

