/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.olacabs.fabric.common.util.metrics.MetricFactory;
import com.olacabs.fabric.compute.EventCollector;
import com.olacabs.fabric.compute.ProcessingContext;
import com.olacabs.fabric.compute.processor.ProcessingException;
import com.olacabs.fabric.compute.processor.ProcessorBase;
import com.olacabs.fabric.compute.processor.ScheduledProcessor;
import com.olacabs.fabric.compute.processor.StreamingProcessor;
import com.olacabs.fabric.model.event.EventSet;
import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessorTestBench {
    private static final Logger log = LoggerFactory.getLogger(ProcessorTestBench.class);
    private ScheduledReporter reporter;
    private MetricRegistry metricRegistry;

    public ProcessorTestBench(boolean metricsEnabled) {
        if (metricsEnabled) {
            this.metricRegistry = MetricFactory.getMetricRegistry();
            this.metricRegistry.timer("consume-timer");
            this.reporter = ConsoleReporter.forRegistry((MetricRegistry)this.metricRegistry).convertDurationsTo(TimeUnit.MILLISECONDS).convertRatesTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build();
        }
    }

    public ProcessorTestBench(String dirPath) {
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)dirPath) ? 1 : 0) != 0, (Object)"Provide a non-null and non-empty filePath");
        File dir = new File(dirPath);
        Preconditions.checkArgument((dir.exists() || dir.mkdirs() ? 1 : 0) != 0, (Object)"Provide a directory path which either exists or can be created");
        this.metricRegistry = MetricFactory.getMetricRegistry();
        this.metricRegistry.timer("consume-timer");
        this.reporter = CsvReporter.forRegistry((MetricRegistry)this.metricRegistry).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).filter(MetricFilter.ALL).build(dir);
    }

    private void startReporter() {
        if (this.reporter != null) {
            this.reporter.start(1L, TimeUnit.SECONDS);
            log.info("Metrics reporter started");
        }
    }

    private void stopReporter() {
        if (this.reporter != null) {
            this.reporter.stop();
            log.info("Metrics reporter stopped");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<EventSet> runScheduledProcessor(ScheduledProcessor processor, long pulseDelay, long numPulses, List<EventSet> incomingEvents) throws Exception {
        Preconditions.checkNotNull((Object)processor, (Object)"Processor can't be null!!");
        Preconditions.checkNotNull(incomingEvents, (Object)"Please provide events to be sent to the processor.consume() method");
        Preconditions.checkArgument((!incomingEvents.isEmpty() ? 1 : 0) != 0, (Object)"Please provide events to be sent to the processor.consume() method");
        Preconditions.checkArgument((pulseDelay > 0L ? 1 : 0) != 0, (Object)"Please provide a positive pulse delay");
        Preconditions.checkArgument((numPulses > 0L ? 1 : 0) != 0, (Object)"Please provide a proper number of pulses to be dilvered");
        ProcessingContext processingContext = new ProcessingContext("test-topology", new ObjectMapper());
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        AtomicLong numGeneratedPulses = new AtomicLong(0L);
        ImmutableList.Builder listBuilder = ImmutableList.builder();
        CompletableFuture pulsesDelivered = new CompletableFuture();
        ScheduledFuture<?> pulseGenFuture = executor.scheduleAtFixedRate(() -> {
            try {
                ScheduledProcessor scheduledProcessor = processor;
                synchronized (scheduledProcessor) {
                    listBuilder.add((Object)EventSet.eventFromEventBuilder().events((Collection)processor.timeTriggerHandler(processingContext)).isAggregate(true).build());
                }
            }
            catch (ProcessingException e) {
                throw new RuntimeException(e);
            }
            if (numGeneratedPulses.incrementAndGet() == numPulses) {
                pulsesDelivered.complete(null);
            }
        }, 0L, pulseDelay, TimeUnit.MILLISECONDS);
        EventCollector eventCollector = new EventCollector(processingContext);
        for (EventSet eventSet : incomingEvents) {
            ScheduledProcessor scheduledProcessor = processor;
            synchronized (scheduledProcessor) {
                this.process((ProcessorBase)processor, processingContext, eventCollector, eventSet);
            }
        }
        pulsesDelivered.get();
        pulseGenFuture.cancel(true);
        return listBuilder.build();
    }

    public List<EventSet> runStreamingProcessor(StreamingProcessor processor, List<EventSet> incomingEvents) throws Exception {
        Preconditions.checkNotNull((Object)processor, (Object)"Processor can't be null!!");
        Preconditions.checkNotNull(incomingEvents, (Object)"Please provide events to be sent to the processor.consume() method");
        Preconditions.checkArgument((!incomingEvents.isEmpty() ? 1 : 0) != 0, (Object)"Please provide events to be sent to the processor.consume() method");
        ProcessingContext processingContext = new ProcessingContext("test-topology", new ObjectMapper());
        ImmutableList.Builder listBuilder = ImmutableList.builder();
        EventCollector eventCollector = new EventCollector(processingContext);
        for (EventSet eventSet : incomingEvents) {
            this.process((ProcessorBase)processor, processingContext, eventCollector, eventSet);
            listBuilder.add((Object)eventCollector.getEvents());
        }
        return listBuilder.build();
    }

    public void runStreamingProcessor(StreamingProcessor processor, List<EventSet> incomingEvents, int n) throws Exception {
        Preconditions.checkNotNull((Object)processor, (Object)"Processor can't be null!!");
        Preconditions.checkNotNull(incomingEvents, (Object)"Please provide events to be sent to the processor.consume() method");
        Preconditions.checkArgument((!incomingEvents.isEmpty() ? 1 : 0) != 0, (Object)"Please provide events to be sent to the processor.consume() method");
        this.startReporter();
        ProcessingContext processingContext = new ProcessingContext("test-topology", new ObjectMapper());
        EventCollector eventCollector = new EventCollector(processingContext);
        for (int i = 0; i < n; ++i) {
            for (EventSet eventSet : incomingEvents) {
                this.process((ProcessorBase)processor, processingContext, eventCollector, eventSet);
            }
        }
        this.stopReporter();
    }

    public Future<Void> runStreamingProcessor(StreamingProcessor processor, LinkedBlockingQueue<EventSet> queue) {
        Preconditions.checkNotNull((Object)processor, (Object)"Processor cannot be null");
        Preconditions.checkNotNull(queue, (Object)"Queue cannot be null");
        this.startReporter();
        ProcessingContext processingContext = new ProcessingContext("test-topology", new ObjectMapper());
        EventCollector eventCollector = new EventCollector(processingContext);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Void> consumerFuture = executorService.submit(() -> {
            while (true) {
                try {
                    EventSet eventSet = (EventSet)queue.take();
                    this.process((ProcessorBase)processor, processingContext, eventCollector, eventSet);
                    continue;
                }
                catch (InterruptedException e) {
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                finally {
                    this.stopReporter();
                    continue;
                }
                break;
            }
            return null;
        });
        executorService.shutdown();
        return consumerFuture;
    }

    private void process(ProcessorBase processor, ProcessingContext processingContext, EventCollector eventCollector, EventSet eventSet) throws ProcessingException {
        long start = System.currentTimeMillis();
        processor.process(processingContext, eventCollector, eventSet);
        long end = System.currentTimeMillis();
        if (null != this.metricRegistry) {
            this.metricRegistry.timer("consume-timer").update(end - start, TimeUnit.MILLISECONDS);
        }
    }
}

