/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.pipeline;

import com.google.common.collect.Lists;
import com.olacabs.fabric.common.util.PropertyReader;
import com.olacabs.fabric.compute.pipeline.MessageSource;
import com.olacabs.fabric.compute.pipeline.NotificationBus;
import com.olacabs.fabric.compute.pipeline.PipelineStage;
import com.olacabs.fabric.compute.processor.InitializationException;
import com.olacabs.fabric.compute.source.PipelineStreamSource;
import com.olacabs.fabric.compute.util.MetaConstants;
import com.olacabs.fabric.model.common.ComponentMetadata;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ComputationPipeline {
    private static final Logger LOGGER = LoggerFactory.getLogger(ComputationPipeline.class);
    private static final long DEFAULT_WAIT_TIME_IN_SECONDS = 30L;
    private final List<PipelineStreamSource> sources = Lists.newArrayList();
    private final List<PipelineStage> stages = Lists.newArrayList();
    private NotificationBus notificationBus;
    private long waitTimeInSeconds;
    private String computationName;

    public static ComputationPipeline builder() {
        return new ComputationPipeline();
    }

    public ComputationPipeline notificationBus(NotificationBus notificationBusArg) {
        this.notificationBus = notificationBusArg;
        return this;
    }

    public ComputationPipeline computationName(String computationNameArg) {
        this.computationName = computationNameArg;
        return this;
    }

    public ComputationPipeline addSource(PipelineStreamSource streamSource) {
        this.sources.add(streamSource);
        return this;
    }

    public ComputationPipeline addPipelineStage(PipelineStage pipelineStage) {
        this.stages.add(pipelineStage);
        return this;
    }

    public ComputationPipeline connect(MessageSource to, PipelineStage ... pipelineStages) {
        this.notificationBus.connect(to, pipelineStages);
        return this;
    }

    public ComputationPipeline initialize(Properties properties) {
        properties.put("computation.name", this.computationName);
        this.waitTimeInSeconds = PropertyReader.readLong((Properties)properties, (Properties)properties, (String)MetaConstants.getComputationKey("shutdown.wait_time_in_seconds"), (Long)30L);
        this.sources.forEach(streamSource -> {
            try {
                streamSource.initialize(properties);
                ComponentMetadata componentMetadata = streamSource.getSourceMetadata();
                LOGGER.info("Initialized source: {}:{}:{}->{}", new Object[]{componentMetadata.getNamespace(), componentMetadata.getName(), componentMetadata.getVersion(), streamSource.getInstanceId()});
            }
            catch (Exception e) {
                throw new RuntimeException(String.format("Error initializing source: %s", streamSource.getInstanceId()), e);
            }
        });
        this.stages.forEach(stage -> {
            try {
                stage.initialize(properties);
                ComponentMetadata componentMetadata = stage.getProcessorMetadata();
                LOGGER.info("Initialized processor: {}:{}:{}->{}", new Object[]{componentMetadata.getNamespace(), componentMetadata.getName(), componentMetadata.getVersion(), stage.getInstanceId()});
            }
            catch (InitializationException e) {
                throw new RuntimeException(String.format("Error initializing processor: %s", stage.getInstanceId()), e);
            }
        });
        return this;
    }

    public boolean healthcheck() {
        boolean a = true;
        try {
            for (PipelineStreamSource source : this.sources) {
                if (a &= source.healthcheck()) continue;
                return a;
            }
            for (PipelineStage stage : this.stages) {
                if (a &= stage.healthcheck()) continue;
                return a;
            }
        }
        catch (Throwable t) {
            a = false;
            LOGGER.error("Error when calling healthcheck on one of the components: ", t);
        }
        return a;
    }

    public ComputationPipeline start() {
        LOGGER.info("Starting pipeline...");
        this.notificationBus.start();
        this.stages.forEach(PipelineStage::start);
        this.sources.forEach(PipelineStreamSource::start);
        return this;
    }

    public void stop() {
        LOGGER.info("Stopping pipeline...");
        this.sources.forEach(PipelineStreamSource::stop);
        try {
            Thread.sleep(this.waitTimeInSeconds * 1000L);
        }
        catch (InterruptedException iEx) {
            LOGGER.warn("Sleep was interrupted: " + iEx.getMessage());
        }
        this.stages.forEach(PipelineStage::stop);
        this.notificationBus.stop();
    }

    public String getComputationName() {
        return this.computationName;
    }
}

