/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.pipelined;

import com.olacabs.fabric.compute.builder.Linker;
import com.olacabs.fabric.compute.builder.Loader;
import com.olacabs.fabric.compute.builder.impl.RegisteringLoader;
import com.olacabs.fabric.compute.pipeline.ComputationPipeline;
import com.olacabs.fabric.compute.pipelined.EventGeneratorProcessor;
import com.olacabs.fabric.compute.pipelined.MemoryBasedPipelineStreamPipelineSource;
import com.olacabs.fabric.compute.pipelined.PrinterStreamingProcessor;
import com.olacabs.fabric.compute.processor.ProcessorBase;
import com.olacabs.fabric.compute.source.PipelineSource;
import com.olacabs.fabric.model.common.ComponentMetadata;
import com.olacabs.fabric.model.common.ComponentType;
import com.olacabs.fabric.model.computation.ComponentInstance;
import com.olacabs.fabric.model.computation.ComputationSpec;
import com.olacabs.fabric.model.computation.Connection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import junit.framework.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class ForkedComputationPipelineTest {
    @Test
    @Ignore
    public void testCheck() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("computation.shutdown.wait_time_in_seconds", "5");
        properties.put("computation.channel.channel_type", " disruptor");
        properties.put("computation.disruptor.buffer_size", "64");
        properties.put("computation.disruptor.wait_strategy", "Yield ");
        RegisteringLoader loader = RegisteringLoader.builder().source("memory", (PipelineSource)new MemoryBasedPipelineStreamPipelineSource()).stage("generator", (ProcessorBase)new EventGeneratorProcessor()).stage("printer", (ProcessorBase)new PrinterStreamingProcessor()).build();
        Linker linker = new Linker((Loader)loader);
        String sourceId = "source_1";
        String pid1 = "generator_1";
        String pid2 = "printer_1";
        ComputationSpec spec = ComputationSpec.builder().name("test-pipeline").source(ComponentInstance.builder().id("source_1").meta(ComponentMetadata.builder().type(ComponentType.SOURCE).id("source_1").name("memory").build()).build()).processor(ComponentInstance.builder().id("generator_1").meta(ComponentMetadata.builder().type(ComponentType.PROCESSOR).id("generator_1").name("generator").build()).build()).processor(ComponentInstance.builder().id("printer_1").meta(ComponentMetadata.builder().type(ComponentType.PROCESSOR).id("printer_1").name("printer").build()).build()).connection(Connection.builder().fromType(ComponentType.SOURCE).from("source_1").to("generator_1").build()).connection(Connection.builder().fromType(ComponentType.PROCESSOR).from("generator_1").to("printer_1").build()).properties(properties).build();
        ComputationPipeline pipeline = linker.build(spec);
        pipeline.initialize(properties);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> ((ComputationPipeline)pipeline).start());
        for (int i = 0; i < 10; ++i) {
            Assert.assertTrue((boolean)pipeline.healthcheck());
        }
        Thread.sleep(2000L);
        pipeline.stop();
        executor.shutdownNow();
    }
}

